blob: b494ce9dd9ef18994ba2af723868163eb87ac029 [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
Madan Jampani86ed0552014-10-03 16:45:42 -07003import static com.google.common.base.Preconditions.checkState;
4import io.netty.buffer.ByteBuf;
5import io.netty.channel.ChannelHandlerContext;
6import io.netty.handler.codec.ReplayingDecoder;
7
Madan Jampaniab6d3112014-10-02 16:30:14 -07008import java.util.Arrays;
9import java.util.List;
10
Madan Jampani29e5dfd2014-10-07 17:26:25 -070011import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
Madan Jampani938aa432014-10-04 17:37:23 -070014/**
15 * Decoder for inbound messages.
16 */
17public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070018
Madan Jampani29e5dfd2014-10-07 17:26:25 -070019 private final Logger log = LoggerFactory.getLogger(getClass());
20
Madan Jampaniab6d3112014-10-02 16:30:14 -070021 private final NettyMessagingService messagingService;
Madan Jampani53e44e62014-10-07 12:39:51 -070022
23 private static final KryoSerializer SERIALIZER = new KryoSerializer();
Madan Jampaniab6d3112014-10-02 16:30:14 -070024
Madan Jampani938aa432014-10-04 17:37:23 -070025 private int contentLength;
26
Madan Jampani53e44e62014-10-07 12:39:51 -070027 public MessageDecoder(NettyMessagingService messagingService) {
Madan Jampani938aa432014-10-04 17:37:23 -070028 super(DecoderState.READ_HEADER_VERSION);
Madan Jampaniab6d3112014-10-02 16:30:14 -070029 this.messagingService = messagingService;
Madan Jampaniab6d3112014-10-02 16:30:14 -070030 }
31
32 @Override
Madan Jampani86ed0552014-10-03 16:45:42 -070033 protected void decode(
34 ChannelHandlerContext context,
35 ByteBuf buffer,
36 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070037
Madan Jampani938aa432014-10-04 17:37:23 -070038 switch(state()) {
39 case READ_HEADER_VERSION:
40 int headerVersion = buffer.readInt();
41 checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
42 checkpoint(DecoderState.READ_PREAMBLE);
43 case READ_PREAMBLE:
44 byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
45 buffer.readBytes(preamble);
46 checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
47 checkpoint(DecoderState.READ_CONTENT_LENGTH);
48 case READ_CONTENT_LENGTH:
49 contentLength = buffer.readInt();
50 checkpoint(DecoderState.READ_SERIALIZER_VERSION);
51 case READ_SERIALIZER_VERSION:
52 int serializerVersion = buffer.readInt();
53 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
54 checkpoint(DecoderState.READ_CONTENT);
55 case READ_CONTENT:
Madan Jampani53e44e62014-10-07 12:39:51 -070056 InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
Madan Jampani938aa432014-10-04 17:37:23 -070057 message.setMessagingService(messagingService);
58 out.add(message);
59 checkpoint(DecoderState.READ_HEADER_VERSION);
60 break;
61 default:
62 checkState(false, "Must not be here");
63 }
Madan Jampaniab6d3112014-10-02 16:30:14 -070064 }
Madan Jampani29e5dfd2014-10-07 17:26:25 -070065
66 @Override
67 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
68 log.error("Exception inside channel handling pipeline.", cause);
69 context.close();
70 }
Madan Jampaniab6d3112014-10-02 16:30:14 -070071}