Sflow ChannelHandler and MessageDecoder
The sFlow byte steam is decoded in the message decoder,
converted to an sFlow object, and then received in the chennal handler.
Change-Id: Ib358489424e4a09bc415278e26b761a03a98015a
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowChannelHandler.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowChannelHandler.java
new file mode 100644
index 0000000..d2500c5
--- /dev/null
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowChannelHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2024-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.sflow.impl;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.sflow.SflowController;
+
+/**
+ * Channel handler deals with the netfow exporter connection and dispatches messages
+ * from netfow exporter to the appropriate locations.
+ */
+public class SflowChannelHandler extends SimpleChannelHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(SflowChannelHandler.class);
+
+ private Channel channel;
+
+ private SflowController controller;
+
+ /**
+ * Create a new sFlow channelHandler instance.
+ *
+ * @param controller sFlow controller.
+ */
+ SflowChannelHandler(SflowController controller) {
+ this.controller = controller;
+ }
+
+ /**
+ * sFlow channel connect to sFlow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel state event
+ * @throws Exception on error while connecting channel
+ */
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+ channel = event.getChannel();
+ }
+
+ /**
+ * sFlow channel disconnect to sFlow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel state event
+ * @throws Exception on error while disconnecting channel
+ */
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+ channel = event.getChannel();
+ }
+
+ /**
+ * sFlow channel exception to sFlow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel exception event
+ * @throws Exception on error while parsing exception
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) throws Exception {
+ //TODO exception handler
+ }
+
+ /**
+ * sFlow message receive from sFlow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel message event
+ * @throws Exception on error while parsing exception
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
+ channel = event.getChannel();
+ }
+
+}
+
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowMessageDecoder.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowMessageDecoder.java
new file mode 100644
index 0000000..9d8c93a
--- /dev/null
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowMessageDecoder.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2024-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.sflow.impl;
+
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decode an sFlow message from a Channel, for use in a netty pipeline.
+ */
+public class SflowMessageDecoder extends FrameDecoder {
+
+ private static final Logger log = LoggerFactory.getLogger(SflowMessageDecoder.class);
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+
+ try {
+ if (buffer.readableBytes() > 0) {
+ byte[] bytes = new byte[buffer.readableBytes()];
+ buffer.readBytes(bytes);
+ ctx.setAttachment(null);
+ return null;
+ }
+ } catch (Exception e) {
+ log.error("sFlow message decode error ", e);
+ buffer.resetReaderIndex();
+ buffer.discardReadBytes();
+ }
+ return null;
+ }
+
+}
+
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java
index 6c70337..37df479 100644
--- a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPipelineFactory.java
@@ -45,7 +45,11 @@
*/
@Override
public ChannelPipeline getPipeline() throws Exception {
+ SflowChannelHandler handler = new SflowChannelHandler(controller);
+
ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("sflowmessagedecoder", new SflowMessageDecoder());
+ pipeline.addLast("ActiveHandler", handler);
return pipeline;
}