blob: 3ed3216460844a347f9177b670507981bec9a1de [file] [log] [blame]
Madan Jampaniab6d3112014-10-02 16:30:14 -07001package org.onlab.netty;
2
Madan Jampani86ed0552014-10-03 16:45:42 -07003import static com.google.common.base.Preconditions.checkState;
4import io.netty.buffer.ByteBuf;
5import io.netty.channel.ChannelHandlerContext;
6import io.netty.handler.codec.ReplayingDecoder;
7
Madan Jampaniab6d3112014-10-02 16:30:14 -07008import java.util.Arrays;
9import java.util.List;
10
Madan Jampani938aa432014-10-04 17:37:23 -070011/**
12 * Decoder for inbound messages.
13 */
14public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070015
16 private final NettyMessagingService messagingService;
17 private final Serializer serializer;
18
Madan Jampani938aa432014-10-04 17:37:23 -070019 private int contentLength;
20
Madan Jampaniab6d3112014-10-02 16:30:14 -070021 public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
Madan Jampani938aa432014-10-04 17:37:23 -070022 super(DecoderState.READ_HEADER_VERSION);
Madan Jampaniab6d3112014-10-02 16:30:14 -070023 this.messagingService = messagingService;
24 this.serializer = serializer;
25 }
26
27 @Override
Madan Jampani86ed0552014-10-03 16:45:42 -070028 protected void decode(
29 ChannelHandlerContext context,
30 ByteBuf buffer,
31 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070032
Madan Jampani938aa432014-10-04 17:37:23 -070033 switch(state()) {
34 case READ_HEADER_VERSION:
35 int headerVersion = buffer.readInt();
36 checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
37 checkpoint(DecoderState.READ_PREAMBLE);
38 case READ_PREAMBLE:
39 byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
40 buffer.readBytes(preamble);
41 checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
42 checkpoint(DecoderState.READ_CONTENT_LENGTH);
43 case READ_CONTENT_LENGTH:
44 contentLength = buffer.readInt();
45 checkpoint(DecoderState.READ_SERIALIZER_VERSION);
46 case READ_SERIALIZER_VERSION:
47 int serializerVersion = buffer.readInt();
48 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
49 checkpoint(DecoderState.READ_CONTENT);
50 case READ_CONTENT:
51 InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
52 message.setMessagingService(messagingService);
53 out.add(message);
54 checkpoint(DecoderState.READ_HEADER_VERSION);
55 break;
56 default:
57 checkState(false, "Must not be here");
58 }
Madan Jampaniab6d3112014-10-02 16:30:14 -070059 }
60}