blob: a11af4e63f386c3c91a1f0f27af872c81c69b092 [file] [log] [blame]
Andrea Campanella101417d2015-12-11 17:58:07 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Andrea Campanella101417d2015-12-11 17:58:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Yuta HIGUCHIe3ae8212017-04-20 10:18:41 -070017package org.onosproject.netconf.ctl.impl;
Andrea Campanella101417d2015-12-11 17:58:07 -080018
Andrea Campanella101417d2015-12-11 17:58:07 -080019import com.google.common.collect.Lists;
20import org.onosproject.netconf.NetconfDeviceInfo;
21import org.onosproject.netconf.NetconfDeviceOutputEvent;
22import org.onosproject.netconf.NetconfDeviceOutputEventListener;
23import org.onosproject.netconf.NetconfException;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26
Kamil Stasiak9f59f442017-05-02 11:02:24 +020027
Andrea Campanella101417d2015-12-11 17:58:07 -080028import java.io.BufferedReader;
Andrea Campanella101417d2015-12-11 17:58:07 -080029import java.io.InputStream;
Andrea Campanella101417d2015-12-11 17:58:07 -080030import java.io.OutputStream;
Sean Condon7347de92017-07-21 12:17:25 +010031import java.io.OutputStreamWriter;
32import java.nio.charset.StandardCharsets;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020033import java.io.InputStreamReader;
34import java.io.IOException;
35import java.io.UnsupportedEncodingException;
Andrea Campanella101417d2015-12-11 17:58:07 -080036import java.util.List;
Sean Condond2c8d472017-02-17 17:09:39 +000037import java.util.Map;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020038import java.util.ArrayList;
Andreas Papazoisd4712e22016-02-10 15:59:55 +020039import java.util.Optional;
Andrea Campanella101417d2015-12-11 17:58:07 -080040import java.util.concurrent.CompletableFuture;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020041import java.util.regex.MatchResult;
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -050042import java.util.regex.Matcher;
43import java.util.regex.Pattern;
Andrea Campanella101417d2015-12-11 17:58:07 -080044
45/**
46 * Thread that gets spawned each time a session is established and handles all the input
47 * and output from the session's streams to and from the NETCONF device the session is
48 * established with.
49 */
50public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
51
52 private static final Logger log = LoggerFactory
53 .getLogger(NetconfStreamThread.class);
Andrea Campanella1311ea02016-03-04 17:51:25 -080054 private static final String HELLO = "<hello";
Andrea Campanella101417d2015-12-11 17:58:07 -080055 private static final String END_PATTERN = "]]>]]>";
56 private static final String RPC_REPLY = "rpc-reply";
57 private static final String RPC_ERROR = "rpc-error";
heisenbergb7017d72016-04-13 02:16:07 -070058 private static final String NOTIFICATION_LABEL = "<notification";
Andreas Papazoisd4712e22016-02-10 15:59:55 +020059 private static final String MESSAGE_ID = "message-id=";
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -050060 private static final Pattern MSGID_PATTERN = Pattern.compile(MESSAGE_ID + "\"(\\d+)\"");
Kamil Stasiak9f59f442017-05-02 11:02:24 +020061 private static final String MSGLEN_REGEX_PATTERN = "\n#\\d+\n";
62 // pattern to verify whole Chunked-Message format
63 private static final Pattern CHUNKED_FRAMING_PATTERN =
64 Pattern.compile("(\\n#([1-9][0-9]*)\\n(.+))+\\n##\\n", Pattern.DOTALL);
65 private static final String MSGLEN_PART_REGEX_PATTERN = "\\d+\n";
66 private static final String CHUNKED_END_REGEX_PATTERN = "\n##\n";
67 // pattern to parse each chunk-size in ChunkedMessage chunk
68 private static final Pattern CHUNKED_SIZE_PATTERN = Pattern.compile("\\n#([1-9][0-9]*)\\n");
69 private static final String HASH = "#";
70 private static final char HASH_CHAR = '#';
71 private static final String LF = "\n";
72 private static final char LF_CHAR = '\n';
Andrea Campanella101417d2015-12-11 17:58:07 -080073
Sean Condon7347de92017-07-21 12:17:25 +010074 private OutputStreamWriter outputStream;
Andrea Campanellab029b9e2016-01-29 11:05:36 -080075 private final InputStream err;
76 private final InputStream in;
77 private NetconfDeviceInfo netconfDeviceInfo;
78 private NetconfSessionDelegate sessionDelegate;
79 private NetconfMessageState state;
helenyrwu0407c642016-06-09 12:01:30 -070080 private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
81 = Lists.newCopyOnWriteArrayList();
82 private boolean enableNotifications = true;
Sean Condond2c8d472017-02-17 17:09:39 +000083 private Map<Integer, CompletableFuture<String>> replies;
Andrea Campanella101417d2015-12-11 17:58:07 -080084
85 public NetconfStreamThread(final InputStream in, final OutputStream out,
86 final InputStream err, NetconfDeviceInfo deviceInfo,
Sean Condond2c8d472017-02-17 17:09:39 +000087 NetconfSessionDelegate delegate,
88 Map<Integer, CompletableFuture<String>> replies) {
Andrea Campanellab029b9e2016-01-29 11:05:36 -080089 this.in = in;
90 this.err = err;
Sean Condon7347de92017-07-21 12:17:25 +010091 outputStream = new OutputStreamWriter(out, StandardCharsets.UTF_8);
Andrea Campanella101417d2015-12-11 17:58:07 -080092 netconfDeviceInfo = deviceInfo;
93 state = NetconfMessageState.NO_MATCHING_PATTERN;
94 sessionDelegate = delegate;
Sean Condond2c8d472017-02-17 17:09:39 +000095 this.replies = replies;
Andrea Campanella101417d2015-12-11 17:58:07 -080096 log.debug("Stream thread for device {} session started", deviceInfo);
97 start();
98 }
99
100 @Override
101 public CompletableFuture<String> sendMessage(String request) {
Sean Condond2c8d472017-02-17 17:09:39 +0000102 Optional<Integer> messageId = getMsgId(request);
103 return sendMessage(request, messageId.get());
104 }
105
106 @Override
107 public CompletableFuture<String> sendMessage(String request, int messageId) {
Andrea Campanellab029b9e2016-01-29 11:05:36 -0800108 log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
Sean Condond2c8d472017-02-17 17:09:39 +0000109 CompletableFuture<String> cf = new CompletableFuture<>();
110 replies.put(messageId, cf);
111
112 synchronized (outputStream) {
Sean Condon7347de92017-07-21 12:17:25 +0100113 try {
114 outputStream.write(request);
115 outputStream.flush();
116 } catch (IOException e) {
117 log.error("Writing to {} failed", netconfDeviceInfo, e);
118 cf.completeExceptionally(e);
119 }
Sean Condond2c8d472017-02-17 17:09:39 +0000120 }
121
122 return cf;
Andrea Campanella101417d2015-12-11 17:58:07 -0800123 }
124
125 public enum NetconfMessageState {
126
127 NO_MATCHING_PATTERN {
128 @Override
129 NetconfMessageState evaluateChar(char c) {
130 if (c == ']') {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800131 return FIRST_BRACKET;
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200132 } else if (c == '\n') {
133 return FIRST_LF;
Andrea Campanella101417d2015-12-11 17:58:07 -0800134 } else {
135 return this;
136 }
137 }
138 },
Andrea Campanella1311ea02016-03-04 17:51:25 -0800139 FIRST_BRACKET {
Andrea Campanella101417d2015-12-11 17:58:07 -0800140 @Override
141 NetconfMessageState evaluateChar(char c) {
142 if (c == ']') {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800143 return SECOND_BRACKET;
Andrea Campanella101417d2015-12-11 17:58:07 -0800144 } else {
145 return NO_MATCHING_PATTERN;
146 }
147 }
148 },
Andrea Campanella1311ea02016-03-04 17:51:25 -0800149 SECOND_BRACKET {
Andrea Campanella101417d2015-12-11 17:58:07 -0800150 @Override
151 NetconfMessageState evaluateChar(char c) {
152 if (c == '>') {
153 return FIRST_BIGGER;
154 } else {
155 return NO_MATCHING_PATTERN;
156 }
157 }
158 },
159 FIRST_BIGGER {
160 @Override
161 NetconfMessageState evaluateChar(char c) {
162 if (c == ']') {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800163 return THIRD_BRACKET;
Andrea Campanella101417d2015-12-11 17:58:07 -0800164 } else {
165 return NO_MATCHING_PATTERN;
166 }
167 }
168 },
Andrea Campanella1311ea02016-03-04 17:51:25 -0800169 THIRD_BRACKET {
Andrea Campanella101417d2015-12-11 17:58:07 -0800170 @Override
171 NetconfMessageState evaluateChar(char c) {
172 if (c == ']') {
173 return ENDING_BIGGER;
174 } else {
175 return NO_MATCHING_PATTERN;
176 }
177 }
178 },
179 ENDING_BIGGER {
180 @Override
181 NetconfMessageState evaluateChar(char c) {
182 if (c == '>') {
183 return END_PATTERN;
184 } else {
185 return NO_MATCHING_PATTERN;
186 }
187 }
188 },
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200189 FIRST_LF {
190 @Override
191 NetconfMessageState evaluateChar(char c) {
192 if (c == '#') {
193 return FIRST_HASH;
194 } else if (c == ']') {
195 return FIRST_BRACKET;
196 } else if (c == '\n') {
197 return this;
198 } else {
199 return NO_MATCHING_PATTERN;
200 }
201 }
202 },
203 FIRST_HASH {
204 @Override
205 NetconfMessageState evaluateChar(char c) {
206 if (c == '#') {
207 return SECOND_HASH;
208 } else {
209 return NO_MATCHING_PATTERN;
210 }
211 }
212 },
213 SECOND_HASH {
214 @Override
215 NetconfMessageState evaluateChar(char c) {
216 if (c == '\n') {
217 return END_CHUNKED_PATTERN;
218 } else {
219 return NO_MATCHING_PATTERN;
220 }
221 }
222 },
223 END_CHUNKED_PATTERN {
224 @Override
225 NetconfMessageState evaluateChar(char c) {
226 return NO_MATCHING_PATTERN;
227 }
228 },
Andrea Campanella101417d2015-12-11 17:58:07 -0800229 END_PATTERN {
230 @Override
231 NetconfMessageState evaluateChar(char c) {
232 return NO_MATCHING_PATTERN;
233 }
234 };
235
236 abstract NetconfMessageState evaluateChar(char c);
237 }
238
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700239 @Override
Andrea Campanellab029b9e2016-01-29 11:05:36 -0800240 public void run() {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200241 BufferedReader bufferReader = null;
242 while (bufferReader == null) {
Andrea Campanella101417d2015-12-11 17:58:07 -0800243 try {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200244 bufferReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
245 } catch (UnsupportedEncodingException e) {
246 e.printStackTrace();
247 }
248 }
249
250 try {
251 boolean socketClosed = false;
252 StringBuilder deviceReplyBuilder = new StringBuilder();
253 while (!socketClosed) {
254 int cInt = bufferReader.read();
255 if (cInt == -1) {
256 log.debug("Netconf device {} sent error char in session," +
257 " will need to be reopend", netconfDeviceInfo);
258 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
259 NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
260 null, null, Optional.of(-1), netconfDeviceInfo);
261 netconfDeviceEventListeners.forEach(
262 listener -> listener.event(event));
263 socketClosed = true;
264 log.debug("Netconf device {} ERROR cInt == -1 socketClosed = true", netconfDeviceInfo);
265 }
266 char c = (char) cInt;
267 state = state.evaluateChar(c);
268 deviceReplyBuilder.append(c);
269 if (state == NetconfMessageState.END_PATTERN) {
270 String deviceReply = deviceReplyBuilder.toString();
271 if (deviceReply.equals(END_PATTERN)) {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800272 socketClosed = true;
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200273 close(deviceReply);
274 } else {
275 deviceReply = deviceReply.replace(END_PATTERN, "");
276 dealWithReply(deviceReply);
277 deviceReplyBuilder.setLength(0);
Andrea Campanella101417d2015-12-11 17:58:07 -0800278 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200279 } else if (state == NetconfMessageState.END_CHUNKED_PATTERN) {
280 String deviceReply = deviceReplyBuilder.toString();
281 if (!validateChunkedFraming(deviceReply)) {
282 log.debug("Netconf device {} send badly framed message {}",
283 netconfDeviceInfo, deviceReply);
284 socketClosed = true;
285 close(deviceReply);
286 } else {
287 deviceReply = deviceReply.replaceAll(MSGLEN_REGEX_PATTERN, "");
288 deviceReply = deviceReply.replaceAll(CHUNKED_END_REGEX_PATTERN, "");
289 dealWithReply(deviceReply);
290 deviceReplyBuilder.setLength(0);
Andrea Campanella101417d2015-12-11 17:58:07 -0800291 }
292 }
Andrea Campanella101417d2015-12-11 17:58:07 -0800293 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200294 } catch (IOException e) {
295 log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e);
296 throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
297 netconfDeviceInfo, e));
298 //TODO should we send a socket closed message to listeners ?
299 }
300 }
301
302 private void close(String deviceReply) {
303 log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}",
304 netconfDeviceInfo, deviceReply);
305 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
306 NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
307 null, null, Optional.of(-1), netconfDeviceInfo);
308 netconfDeviceEventListeners.forEach(
309 listener -> listener.event(event));
310 this.interrupt();
311 }
312
313 private void dealWithReply(String deviceReply) {
314 if (deviceReply.contains(RPC_REPLY) ||
315 deviceReply.contains(RPC_ERROR) ||
316 deviceReply.contains(HELLO)) {
317 log.debug("Netconf device {} sessionDelegate.notify() DEVICE_REPLY {} {}",
318 netconfDeviceInfo, getMsgId(deviceReply), deviceReply);
319 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
320 NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
321 null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
322 sessionDelegate.notify(event);
323 netconfDeviceEventListeners.forEach(
324 listener -> listener.event(event));
325 } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
326 log.debug("Netconf device {} DEVICE_NOTIFICATION {} {} {}",
327 netconfDeviceInfo, enableNotifications,
328 getMsgId(deviceReply), deviceReply);
329 if (enableNotifications) {
330 log.debug("dispatching to {} listeners", netconfDeviceEventListeners.size());
331 final String finalDeviceReply = deviceReply;
332 netconfDeviceEventListeners.forEach(
333 listener -> listener.event(new NetconfDeviceOutputEvent(
334 NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
335 null, finalDeviceReply, getMsgId(finalDeviceReply),
336 netconfDeviceInfo)));
337 }
338 } else {
339 log.debug("Error on reply from device {} {}", netconfDeviceInfo, deviceReply);
340 }
341 }
342
343 static boolean validateChunkedFraming(String reply) {
344 Matcher matcher = CHUNKED_FRAMING_PATTERN.matcher(reply);
345 if (!matcher.matches()) {
346 log.debug("Error on reply {}", reply);
347 return false;
348 }
349 Matcher chunkM = CHUNKED_SIZE_PATTERN.matcher(reply);
350 List<MatchResult> chunks = new ArrayList<>();
351 String chunkdataStr = "";
352 while (chunkM.find()) {
353 chunks.add(chunkM.toMatchResult());
354 // extract chunk-data (and later) in bytes
355 int bytes = Integer.parseInt(chunkM.group(1));
356 byte[] chunkdata = reply.substring(chunkM.end()).getBytes(StandardCharsets.UTF_8);
357 if (bytes > chunkdata.length) {
358 log.debug("Error on reply - wrong chunk size {}", reply);
359 return false;
360 }
361 //check if after chunk-size bytes there is next chunk or message ending
362 if (chunkdata[bytes] != LF_CHAR || chunkdata[bytes + 1] != HASH_CHAR) {
363 log.debug("Error on reply - wrong chunk size {}", reply);
364 return false;
365 }
366 // convert (only) chunk-data part into String
367 chunkdataStr = new String(chunkdata, 0, bytes, StandardCharsets.UTF_8);
368 // skip chunk-data part from next match
369 chunkM.region(chunkM.end() + chunkdataStr.length(), reply.length());
370 }
371 if (!"\n##\n".equals(reply.substring(chunks.get(chunks.size() - 1).end() + chunkdataStr.length()))) {
372 log.debug("Error on reply {}", reply);
373 return false;
374 }
375 return true;
Andrea Campanella101417d2015-12-11 17:58:07 -0800376 }
377
Sean Condond2c8d472017-02-17 17:09:39 +0000378 protected static Optional<Integer> getMsgId(String reply) {
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -0500379 Matcher matcher = MSGID_PATTERN.matcher(reply);
380 if (matcher.find()) {
Sean Condon7347de92017-07-21 12:17:25 +0100381 try {
382 return Optional.of(Integer.valueOf(matcher.group(1)));
383 } catch (NumberFormatException e) {
384 log.warn("Failed to parse message-id from {}", matcher.group(), e);
385 }
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -0500386 }
387 if (reply.contains(HELLO)) {
Andrea Campanella7bbe7b12017-05-03 16:03:38 -0700388 return Optional.of(-1);
Andrea Campanella101417d2015-12-11 17:58:07 -0800389 }
Andrea Campanella1311ea02016-03-04 17:51:25 -0800390 return Optional.empty();
Andrea Campanella101417d2015-12-11 17:58:07 -0800391 }
392
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700393 @Override
Andrea Campanella101417d2015-12-11 17:58:07 -0800394 public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
395 if (!netconfDeviceEventListeners.contains(listener)) {
396 netconfDeviceEventListeners.add(listener);
397 }
398 }
399
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700400 @Override
Andrea Campanella101417d2015-12-11 17:58:07 -0800401 public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
402 netconfDeviceEventListeners.remove(listener);
403 }
helenyrwu0407c642016-06-09 12:01:30 -0700404
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700405 @Override
helenyrwu0407c642016-06-09 12:01:30 -0700406 public void setEnableNotifications(boolean enableNotifications) {
407 this.enableNotifications = enableNotifications;
408 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200409}