blob: 548129f85d29297b24fdedaabc5c456ae298848a [file] [log] [blame]
Tomek OsiƄskie9ccf412018-01-13 19:44:11 +01001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.xmpp.core.ctl.handlers;
18
19import com.fasterxml.aalto.WFCException;
20
21import io.netty.channel.Channel;
22import io.netty.channel.ChannelHandlerContext;
23import io.netty.channel.ChannelInboundHandlerAdapter;
24import io.netty.channel.ChannelOutboundHandlerAdapter;
25import io.netty.channel.ChannelPromise;
26import io.netty.channel.CombinedChannelDuplexHandler;
27import org.dom4j.Element;
28import org.onosproject.xmpp.core.XmppDevice;
29import org.onosproject.xmpp.core.XmppDeviceFactory;
30import org.onosproject.xmpp.core.XmppSession;
31import org.onosproject.xmpp.core.ctl.exception.UnsupportedStanzaTypeException;
32import org.onosproject.xmpp.core.ctl.exception.XmppValidationException;
33
34import org.onosproject.xmpp.core.stream.XmppStreamClose;
35import org.onosproject.xmpp.core.stream.XmppStreamError;
36import org.onosproject.xmpp.core.stream.XmppStreamOpen;
37import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39import org.xmpp.packet.Packet;
40
41
42import java.net.InetSocketAddress;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45
46import static org.onlab.util.Tools.groupedThreads;
47
48/**
49 * Handles a XMPP channel related events and implements XMPP state machine.
50 */
51public class XmppChannelHandler extends CombinedChannelDuplexHandler implements XmppSession {
52
53 private final Logger logger = LoggerFactory.getLogger(getClass());
54
55 protected ExecutorService executorService =
56 Executors.newFixedThreadPool(32, groupedThreads("onos/xmpp", "message-stats-%d", logger));
57
58 protected volatile ChannelState state;
59 protected Channel channel;
60
61 protected XmppDevice xmppDevice;
62 private XmppDeviceFactory deviceFactory;
63
64 public XmppChannelHandler(XmppDeviceFactory deviceFactory) {
65 ChannelInboundHandlerAdapter inboundHandlerAdapter = new ChannelInboundHandlerAdapter();
66 ChannelOutboundHandlerAdapter outboundHandlerAdapter = new ChannelOutboundHandlerAdapter();
67 this.init(inboundHandlerAdapter, outboundHandlerAdapter);
68 this.state = ChannelState.IDLE;
69 this.deviceFactory = deviceFactory;
70 }
71
72 @Override
73 public boolean isActive() {
74 return channel.isActive();
75 }
76
77 @Override
78 public InetSocketAddress remoteAddress() {
79 return (InetSocketAddress) channel.remoteAddress();
80 }
81
82 @Override
83 public void closeSession() {
84 sendStreamCloseReply();
85 }
86
87 @Override
88 public boolean sendPacket(Packet xmppPacket) {
89 if (channel.isActive()) {
90 channel.writeAndFlush(xmppPacket, channel.voidPromise());
91 return true;
92 } else {
93 logger.warn("Dropping messages for device {} because channel is not connected: {}",
94 xmppDevice.getIpAddress(), xmppPacket);
95 return false;
96 }
97 }
98
99 enum XmppEvent {
100 XmppStreamClose, XmppStreamOpen, XmppStreamError, IQ, Message, Presence
101 }
102
103 enum ChannelState {
104
105 IDLE() {
106 @Override
107 void processStreamClose(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamClose msg) {
108 // ignore
109 }
110
111 @Override
112 void processStreamError(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamError error) {
113 // ignore
114 }
115
116 @Override
117 void processUpstreamXmppPacket(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
118 // ignore
119 handler.logger.info("XMPP Packet in state IDLE received. Packet ignored..");
120 }
121 },
122
123 WAIT_STREAM_CLOSE() {
124 @Override
125 void processDownstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
126 /**
127 * Block all downstream events during WAIT_STREAM_CLOSE.
128 *
129 * RFC 6120
130 * 4.4 Closing a Stream
131 * "2. Refrain from sending any further data over its outbound stream to the other entity,
132 * but continue to process data received from the other entity (and, if necessary, process such data)."
133 */
134 }
135
136 @Override
137 void processStreamClose(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamClose msg) {
138 handler.xmppDevice.disconnectDevice();
139 handler.closeChannel();
140 handler.setState(IDLE);
141 }
142
143 @Override
144 void processStreamOpen(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamOpen streamOpen) {
145 // ignore
146 }
147 },
148
149 STREAM_OPEN() {
150 @Override
151 void processStreamOpen(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamOpen streamOpen) {
152 // ignore
153 }
154
155 @Override
156 void processDownstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
157 if (msg instanceof XmppStreamClose) {
158 handler.setState(ChannelState.WAIT_STREAM_CLOSE);
159 }
160 ctx.writeAndFlush(msg);
161 }
162 };
163
164 void processStreamError(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamError streamError) {
165 handler.handleStreamError(streamError);
166 }
167
168 void processStreamOpen(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamOpen xmppStreamOpen) {
169 handler.xmppDevice = handler.deviceFactory.getXmppDevice(xmppStreamOpen.getFromJid(), handler);
170 handler.sendStreamOpenReply(xmppStreamOpen);
171 handler.xmppDevice.registerConnectedDevice();
172 handler.setState(STREAM_OPEN);
173 }
174
175 void processStreamClose(XmppChannelHandler handler, ChannelHandlerContext ctx, XmppStreamClose msg) {
176 handler.sendStreamCloseReply();
177 handler.xmppDevice.disconnectDevice();
178 }
179
180 void processUpstreamXmppPacket(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
181 handler.executorService.execute(new XmppPacketHandler(handler.xmppDevice, ctx, (Packet) msg));
182 }
183
184 void processDownstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
185 ctx.writeAndFlush(msg);
186 }
187
188 void processUpstreamXmppEvent(XmppChannelHandler handler, ChannelHandlerContext ctx, Object msg) {
189 XmppEvent event = XmppEvent.valueOf(msg.getClass().getSimpleName());
190 handler.logger.info("XMPP event {} received in STATE={} for device: {}",
191 event, handler.state, ctx.channel().remoteAddress());
192 switch (event) {
193 case XmppStreamOpen:
194 handler.state.processStreamOpen(handler, ctx, (XmppStreamOpen) msg);
195 break;
196 case XmppStreamClose:
197 handler.state.processStreamClose(handler, ctx, (XmppStreamClose) msg);
198 break;
199 case XmppStreamError:
200 handler.state.processStreamError(handler, ctx, (XmppStreamError) msg);
201 break;
202 case IQ:
203 case Message:
204 case Presence:
205 handler.state.processUpstreamXmppPacket(handler, ctx, msg);
206 break;
207 default:
208 break;
209 }
210 }
211 }
212
213 private void closeChannel() {
214 if (channel != null) {
215 channel.close();
216 }
217 }
218
219 private void handleStreamError(XmppStreamError streamError) {
220 // TODO: handle stream errors
221 }
222
223 private void sendStreamCloseReply() {
224 XmppStreamClose streamClose = new XmppStreamClose();
225 channel.writeAndFlush(streamClose);
226 }
227
228 private void sendStreamOpenReply(XmppStreamOpen xmppStreamOpen) {
229 Element element = xmppStreamOpen.getElement().createCopy();
230 element.addAttribute("from", xmppStreamOpen.getToJid().toString());
231 element.addAttribute("to", xmppStreamOpen.getFromJid().toString());
232 XmppStreamOpen xmppStreamOpenReply = new XmppStreamOpen(element);
233 channel.writeAndFlush(xmppStreamOpenReply);
234 }
235
236 private void sendStreamError(XmppStreamError.Condition condition) {
237 XmppStreamError error = new XmppStreamError(condition);
238 channel.writeAndFlush(error);
239 }
240
241 private void handleChannelException(Throwable cause) {
242 XmppStreamError.Condition condition = getStreamErrorCondition(cause.getCause());
243 sendStreamError(condition);
244 sendStreamCloseReply();
245 }
246
247 protected void setState(ChannelState state) {
248 logger.info("Transition from state {} to {}", this.state, state);
249 this.state = state;
250 }
251
252 @Override
253 public void channelActive(ChannelHandlerContext ctx) throws Exception {
254 channel = ctx.channel();
255 logger.info("New device connection from {}",
256 channel.remoteAddress());
257 this.state = ChannelState.IDLE;
258 }
259
260 @Override
261 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
262 this.state.processUpstreamXmppEvent(this, ctx, msg);
263 }
264
265 @Override
266 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
267 throws Exception {
268 logger.info("Exception caught: {}", cause.getMessage());
269 handleChannelException(cause.getCause());
270 }
271
272 private XmppStreamError.Condition getStreamErrorCondition(Throwable cause) {
273 //TODO: add error handle mechanisms for each cases
274 if (cause instanceof UnsupportedStanzaTypeException) {
275 return XmppStreamError.Condition.unsupported_stanza_type;
276 } else if (cause instanceof WFCException) {
277 return XmppStreamError.Condition.bad_format;
278 } else if (cause instanceof XmppValidationException) {
279 return XmppStreamError.Condition.bad_format;
280 } else {
281 return XmppStreamError.Condition.internal_server_error;
282 }
283 }
284
285 @Override
286 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
287 this.state.processDownstreamXmppEvent(this, ctx, msg);
288 logger.info("Writing packet... Current State " + this.state.toString());
289 }
290
291 /**
292 * XMPP message handler.
293 */
294 private static final class XmppPacketHandler implements Runnable {
295
296 protected final ChannelHandlerContext ctx;
297 protected final Packet packet;
298 protected final XmppDevice xmppDevice;
299
300 public XmppPacketHandler(XmppDevice xmppDevice, ChannelHandlerContext ctx, Packet packet) {
301 this.ctx = ctx;
302 this.packet = packet;
303 this.xmppDevice = xmppDevice;
304 }
305
306 @Override
307 public void run() {
308 xmppDevice.handlePacket(packet);
309 }
310 }
311}