Sflow ChannelHandler and MessageDecoder

The sFlow byte steam is decoded in the message decoder,
converted to an sFlow object, and then received in the chennal handler.

Change-Id: Ib358489424e4a09bc415278e26b761a03a98015a
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowChannelHandler.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowChannelHandler.java
new file mode 100644
index 0000000..d2500c5
--- /dev/null
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowChannelHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2024-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.sflow.impl;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.sflow.SflowController;
+
+/**
+ * Channel handler deals with the netfow exporter connection and dispatches messages
+ * from netfow exporter to the appropriate locations.
+ */
+public class SflowChannelHandler extends SimpleChannelHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(SflowChannelHandler.class);
+
+    private Channel channel;
+
+    private SflowController controller;
+
+    /**
+     * Create a new sFlow channelHandler instance.
+     *
+     * @param controller sFlow controller.
+     */
+    SflowChannelHandler(SflowController controller) {
+        this.controller = controller;
+    }
+
+    /**
+     * sFlow channel connect to sFlow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel state event
+     * @throws Exception on error while connecting channel
+     */
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+        channel = event.getChannel();
+    }
+
+    /**
+     * sFlow channel disconnect to sFlow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel state event
+     * @throws Exception on error while disconnecting channel
+     */
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+        channel = event.getChannel();
+    }
+
+    /**
+     * sFlow channel exception to sFlow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel exception event
+     * @throws Exception on error while parsing exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) throws Exception {
+        //TODO exception handler
+    }
+
+    /**
+     * sFlow message receive from sFlow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel message event
+     * @throws Exception on error while parsing exception
+     */
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
+        channel = event.getChannel();
+    }
+
+}
+
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowMessageDecoder.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowMessageDecoder.java
new file mode 100644
index 0000000..9d8c93a
--- /dev/null
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowMessageDecoder.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2024-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.sflow.impl;
+
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decode an sFlow message from a Channel, for use in a netty pipeline.
+ */
+public class SflowMessageDecoder extends FrameDecoder {
+
+    private static final Logger log = LoggerFactory.getLogger(SflowMessageDecoder.class);
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+
+        try {
+            if (buffer.readableBytes() > 0) {
+                byte[] bytes = new byte[buffer.readableBytes()];
+                buffer.readBytes(bytes);
+                ctx.setAttachment(null);
+                return null;
+            }
+        } catch (Exception e) {
+            log.error("sFlow message decode error ", e);
+            buffer.resetReaderIndex();
+            buffer.discardReadBytes();
+        }
+        return null;
+    }
+
+}
+
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java
index 6c70337..37df479 100644
--- a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java
@@ -45,7 +45,11 @@
      */
     @Override
     public ChannelPipeline getPipeline() throws Exception {
+        SflowChannelHandler handler = new SflowChannelHandler(controller);
+
         ChannelPipeline pipeline = Channels.pipeline();
+        pipeline.addLast("sflowmessagedecoder", new SflowMessageDecoder());
+        pipeline.addLast("ActiveHandler", handler);
         return pipeline;
     }