blob: c957153560f088178520bc546fc48a18f75b3d23 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
Madan Jampaniab6d3112014-10-02 16:30:14 -070016package org.onlab.netty;
17
Madan Jampani86ed0552014-10-03 16:45:42 -070018import static com.google.common.base.Preconditions.checkState;
19import io.netty.buffer.ByteBuf;
20import io.netty.channel.ChannelHandlerContext;
21import io.netty.handler.codec.ReplayingDecoder;
22
Madan Jampaniab6d3112014-10-02 16:30:14 -070023import java.util.Arrays;
24import java.util.List;
25
Madan Jampani29e5dfd2014-10-07 17:26:25 -070026import org.slf4j.Logger;
27import org.slf4j.LoggerFactory;
28
Madan Jampani938aa432014-10-04 17:37:23 -070029/**
30 * Decoder for inbound messages.
31 */
32public class MessageDecoder extends ReplayingDecoder<DecoderState> {
Madan Jampaniab6d3112014-10-02 16:30:14 -070033
Madan Jampani29e5dfd2014-10-07 17:26:25 -070034 private final Logger log = LoggerFactory.getLogger(getClass());
35
Madan Jampaniab6d3112014-10-02 16:30:14 -070036 private final NettyMessagingService messagingService;
Madan Jampani53e44e62014-10-07 12:39:51 -070037
38 private static final KryoSerializer SERIALIZER = new KryoSerializer();
Madan Jampaniab6d3112014-10-02 16:30:14 -070039
Madan Jampani938aa432014-10-04 17:37:23 -070040 private int contentLength;
41
Madan Jampani53e44e62014-10-07 12:39:51 -070042 public MessageDecoder(NettyMessagingService messagingService) {
Madan Jampani938aa432014-10-04 17:37:23 -070043 super(DecoderState.READ_HEADER_VERSION);
Madan Jampaniab6d3112014-10-02 16:30:14 -070044 this.messagingService = messagingService;
Madan Jampaniab6d3112014-10-02 16:30:14 -070045 }
46
47 @Override
Madan Jampani86ed0552014-10-03 16:45:42 -070048 protected void decode(
49 ChannelHandlerContext context,
50 ByteBuf buffer,
51 List<Object> out) throws Exception {
Madan Jampaniab6d3112014-10-02 16:30:14 -070052
Madan Jampani938aa432014-10-04 17:37:23 -070053 switch(state()) {
54 case READ_HEADER_VERSION:
55 int headerVersion = buffer.readInt();
56 checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
57 checkpoint(DecoderState.READ_PREAMBLE);
58 case READ_PREAMBLE:
59 byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
60 buffer.readBytes(preamble);
61 checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
62 checkpoint(DecoderState.READ_CONTENT_LENGTH);
63 case READ_CONTENT_LENGTH:
64 contentLength = buffer.readInt();
65 checkpoint(DecoderState.READ_SERIALIZER_VERSION);
66 case READ_SERIALIZER_VERSION:
67 int serializerVersion = buffer.readInt();
68 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
69 checkpoint(DecoderState.READ_CONTENT);
70 case READ_CONTENT:
Madan Jampani53e44e62014-10-07 12:39:51 -070071 InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
Madan Jampani938aa432014-10-04 17:37:23 -070072 message.setMessagingService(messagingService);
73 out.add(message);
74 checkpoint(DecoderState.READ_HEADER_VERSION);
75 break;
76 default:
77 checkState(false, "Must not be here");
78 }
Madan Jampaniab6d3112014-10-02 16:30:14 -070079 }
Madan Jampani29e5dfd2014-10-07 17:26:25 -070080
81 @Override
82 public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
83 log.error("Exception inside channel handling pipeline.", cause);
84 context.close();
85 }
Madan Jampaniab6d3112014-10-02 16:30:14 -070086}