blob: ab639e90096ac45552abd5c320d5372ea3f4d230 [file] [log] [blame]
Andrea Campanella101417d2015-12-11 17:58:07 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Andrea Campanella101417d2015-12-11 17:58:07 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Yuta HIGUCHIe3ae8212017-04-20 10:18:41 -070017package org.onosproject.netconf.ctl.impl;
Andrea Campanella101417d2015-12-11 17:58:07 -080018
Andrea Campanella101417d2015-12-11 17:58:07 -080019import com.google.common.collect.Lists;
20import org.onosproject.netconf.NetconfDeviceInfo;
21import org.onosproject.netconf.NetconfDeviceOutputEvent;
22import org.onosproject.netconf.NetconfDeviceOutputEventListener;
23import org.onosproject.netconf.NetconfException;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26
Kamil Stasiak9f59f442017-05-02 11:02:24 +020027
Andrea Campanella101417d2015-12-11 17:58:07 -080028import java.io.BufferedReader;
Andrea Campanella101417d2015-12-11 17:58:07 -080029import java.io.InputStream;
Andrea Campanella101417d2015-12-11 17:58:07 -080030import java.io.OutputStream;
Sean Condon7347de92017-07-21 12:17:25 +010031import java.io.OutputStreamWriter;
32import java.nio.charset.StandardCharsets;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020033import java.io.InputStreamReader;
34import java.io.IOException;
35import java.io.UnsupportedEncodingException;
Andrea Campanella101417d2015-12-11 17:58:07 -080036import java.util.List;
Sean Condond2c8d472017-02-17 17:09:39 +000037import java.util.Map;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020038import java.util.ArrayList;
Andreas Papazoisd4712e22016-02-10 15:59:55 +020039import java.util.Optional;
Andrea Campanella101417d2015-12-11 17:58:07 -080040import java.util.concurrent.CompletableFuture;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020041import java.util.regex.MatchResult;
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -050042import java.util.regex.Matcher;
43import java.util.regex.Pattern;
Andrea Campanella101417d2015-12-11 17:58:07 -080044
45/**
46 * Thread that gets spawned each time a session is established and handles all the input
47 * and output from the session's streams to and from the NETCONF device the session is
48 * established with.
49 */
50public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
51
52 private static final Logger log = LoggerFactory
53 .getLogger(NetconfStreamThread.class);
Andrea Campanella1311ea02016-03-04 17:51:25 -080054 private static final String HELLO = "<hello";
Andrea Campanella101417d2015-12-11 17:58:07 -080055 private static final String END_PATTERN = "]]>]]>";
56 private static final String RPC_REPLY = "rpc-reply";
57 private static final String RPC_ERROR = "rpc-error";
heisenbergb7017d72016-04-13 02:16:07 -070058 private static final String NOTIFICATION_LABEL = "<notification";
Andreas Papazoisd4712e22016-02-10 15:59:55 +020059 private static final String MESSAGE_ID = "message-id=";
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -050060 private static final Pattern MSGID_PATTERN = Pattern.compile(MESSAGE_ID + "\"(\\d+)\"");
Kamil Stasiak9f59f442017-05-02 11:02:24 +020061 private static final String MSGLEN_REGEX_PATTERN = "\n#\\d+\n";
62 // pattern to verify whole Chunked-Message format
63 private static final Pattern CHUNKED_FRAMING_PATTERN =
64 Pattern.compile("(\\n#([1-9][0-9]*)\\n(.+))+\\n##\\n", Pattern.DOTALL);
Kamil Stasiak9f59f442017-05-02 11:02:24 +020065 private static final String CHUNKED_END_REGEX_PATTERN = "\n##\n";
66 // pattern to parse each chunk-size in ChunkedMessage chunk
67 private static final Pattern CHUNKED_SIZE_PATTERN = Pattern.compile("\\n#([1-9][0-9]*)\\n");
Kamil Stasiak9f59f442017-05-02 11:02:24 +020068 private static final char HASH_CHAR = '#';
Kamil Stasiak9f59f442017-05-02 11:02:24 +020069 private static final char LF_CHAR = '\n';
Andrea Campanella101417d2015-12-11 17:58:07 -080070
Sean Condon7347de92017-07-21 12:17:25 +010071 private OutputStreamWriter outputStream;
Andrea Campanellab029b9e2016-01-29 11:05:36 -080072 private final InputStream err;
73 private final InputStream in;
74 private NetconfDeviceInfo netconfDeviceInfo;
75 private NetconfSessionDelegate sessionDelegate;
76 private NetconfMessageState state;
helenyrwu0407c642016-06-09 12:01:30 -070077 private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
78 = Lists.newCopyOnWriteArrayList();
79 private boolean enableNotifications = true;
Sean Condond2c8d472017-02-17 17:09:39 +000080 private Map<Integer, CompletableFuture<String>> replies;
Andrea Campanella101417d2015-12-11 17:58:07 -080081
82 public NetconfStreamThread(final InputStream in, final OutputStream out,
83 final InputStream err, NetconfDeviceInfo deviceInfo,
Sean Condond2c8d472017-02-17 17:09:39 +000084 NetconfSessionDelegate delegate,
85 Map<Integer, CompletableFuture<String>> replies) {
Andrea Campanellab029b9e2016-01-29 11:05:36 -080086 this.in = in;
87 this.err = err;
Sean Condon7347de92017-07-21 12:17:25 +010088 outputStream = new OutputStreamWriter(out, StandardCharsets.UTF_8);
Andrea Campanella101417d2015-12-11 17:58:07 -080089 netconfDeviceInfo = deviceInfo;
90 state = NetconfMessageState.NO_MATCHING_PATTERN;
91 sessionDelegate = delegate;
Sean Condond2c8d472017-02-17 17:09:39 +000092 this.replies = replies;
Andrea Campanella101417d2015-12-11 17:58:07 -080093 log.debug("Stream thread for device {} session started", deviceInfo);
94 start();
95 }
96
97 @Override
98 public CompletableFuture<String> sendMessage(String request) {
Sean Condond2c8d472017-02-17 17:09:39 +000099 Optional<Integer> messageId = getMsgId(request);
100 return sendMessage(request, messageId.get());
101 }
102
103 @Override
104 public CompletableFuture<String> sendMessage(String request, int messageId) {
Andrea Campanellab029b9e2016-01-29 11:05:36 -0800105 log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
Sean Condond2c8d472017-02-17 17:09:39 +0000106 CompletableFuture<String> cf = new CompletableFuture<>();
107 replies.put(messageId, cf);
108
109 synchronized (outputStream) {
Sean Condon7347de92017-07-21 12:17:25 +0100110 try {
111 outputStream.write(request);
112 outputStream.flush();
113 } catch (IOException e) {
114 log.error("Writing to {} failed", netconfDeviceInfo, e);
115 cf.completeExceptionally(e);
116 }
Sean Condond2c8d472017-02-17 17:09:39 +0000117 }
118
119 return cf;
Andrea Campanella101417d2015-12-11 17:58:07 -0800120 }
121
122 public enum NetconfMessageState {
123
124 NO_MATCHING_PATTERN {
125 @Override
126 NetconfMessageState evaluateChar(char c) {
127 if (c == ']') {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800128 return FIRST_BRACKET;
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200129 } else if (c == '\n') {
130 return FIRST_LF;
Andrea Campanella101417d2015-12-11 17:58:07 -0800131 } else {
132 return this;
133 }
134 }
135 },
Andrea Campanella1311ea02016-03-04 17:51:25 -0800136 FIRST_BRACKET {
Andrea Campanella101417d2015-12-11 17:58:07 -0800137 @Override
138 NetconfMessageState evaluateChar(char c) {
139 if (c == ']') {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800140 return SECOND_BRACKET;
Andrea Campanella101417d2015-12-11 17:58:07 -0800141 } else {
142 return NO_MATCHING_PATTERN;
143 }
144 }
145 },
Andrea Campanella1311ea02016-03-04 17:51:25 -0800146 SECOND_BRACKET {
Andrea Campanella101417d2015-12-11 17:58:07 -0800147 @Override
148 NetconfMessageState evaluateChar(char c) {
149 if (c == '>') {
150 return FIRST_BIGGER;
151 } else {
152 return NO_MATCHING_PATTERN;
153 }
154 }
155 },
156 FIRST_BIGGER {
157 @Override
158 NetconfMessageState evaluateChar(char c) {
159 if (c == ']') {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800160 return THIRD_BRACKET;
Andrea Campanella101417d2015-12-11 17:58:07 -0800161 } else {
162 return NO_MATCHING_PATTERN;
163 }
164 }
165 },
Andrea Campanella1311ea02016-03-04 17:51:25 -0800166 THIRD_BRACKET {
Andrea Campanella101417d2015-12-11 17:58:07 -0800167 @Override
168 NetconfMessageState evaluateChar(char c) {
169 if (c == ']') {
170 return ENDING_BIGGER;
171 } else {
172 return NO_MATCHING_PATTERN;
173 }
174 }
175 },
176 ENDING_BIGGER {
177 @Override
178 NetconfMessageState evaluateChar(char c) {
179 if (c == '>') {
180 return END_PATTERN;
181 } else {
182 return NO_MATCHING_PATTERN;
183 }
184 }
185 },
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200186 FIRST_LF {
187 @Override
188 NetconfMessageState evaluateChar(char c) {
189 if (c == '#') {
190 return FIRST_HASH;
191 } else if (c == ']') {
192 return FIRST_BRACKET;
193 } else if (c == '\n') {
194 return this;
195 } else {
196 return NO_MATCHING_PATTERN;
197 }
198 }
199 },
200 FIRST_HASH {
201 @Override
202 NetconfMessageState evaluateChar(char c) {
203 if (c == '#') {
204 return SECOND_HASH;
205 } else {
206 return NO_MATCHING_PATTERN;
207 }
208 }
209 },
210 SECOND_HASH {
211 @Override
212 NetconfMessageState evaluateChar(char c) {
213 if (c == '\n') {
214 return END_CHUNKED_PATTERN;
215 } else {
216 return NO_MATCHING_PATTERN;
217 }
218 }
219 },
220 END_CHUNKED_PATTERN {
221 @Override
222 NetconfMessageState evaluateChar(char c) {
223 return NO_MATCHING_PATTERN;
224 }
225 },
Andrea Campanella101417d2015-12-11 17:58:07 -0800226 END_PATTERN {
227 @Override
228 NetconfMessageState evaluateChar(char c) {
229 return NO_MATCHING_PATTERN;
230 }
231 };
232
233 abstract NetconfMessageState evaluateChar(char c);
234 }
235
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700236 @Override
Andrea Campanellab029b9e2016-01-29 11:05:36 -0800237 public void run() {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200238 BufferedReader bufferReader = null;
239 while (bufferReader == null) {
Andrea Campanella101417d2015-12-11 17:58:07 -0800240 try {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200241 bufferReader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
242 } catch (UnsupportedEncodingException e) {
Ray Milkeyba547f92018-02-01 15:22:31 -0800243 throw new IllegalStateException(e);
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200244 }
245 }
246
247 try {
248 boolean socketClosed = false;
249 StringBuilder deviceReplyBuilder = new StringBuilder();
250 while (!socketClosed) {
251 int cInt = bufferReader.read();
252 if (cInt == -1) {
253 log.debug("Netconf device {} sent error char in session," +
Andrea Campanella59227c32019-01-07 14:52:35 +0100254 " will need to be reopened", netconfDeviceInfo);
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200255 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
256 NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
257 null, null, Optional.of(-1), netconfDeviceInfo);
258 netconfDeviceEventListeners.forEach(
259 listener -> listener.event(event));
260 socketClosed = true;
261 log.debug("Netconf device {} ERROR cInt == -1 socketClosed = true", netconfDeviceInfo);
262 }
263 char c = (char) cInt;
264 state = state.evaluateChar(c);
265 deviceReplyBuilder.append(c);
266 if (state == NetconfMessageState.END_PATTERN) {
267 String deviceReply = deviceReplyBuilder.toString();
268 if (deviceReply.equals(END_PATTERN)) {
Andrea Campanella1311ea02016-03-04 17:51:25 -0800269 socketClosed = true;
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200270 close(deviceReply);
271 } else {
272 deviceReply = deviceReply.replace(END_PATTERN, "");
273 dealWithReply(deviceReply);
274 deviceReplyBuilder.setLength(0);
Andrea Campanella101417d2015-12-11 17:58:07 -0800275 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200276 } else if (state == NetconfMessageState.END_CHUNKED_PATTERN) {
277 String deviceReply = deviceReplyBuilder.toString();
278 if (!validateChunkedFraming(deviceReply)) {
279 log.debug("Netconf device {} send badly framed message {}",
280 netconfDeviceInfo, deviceReply);
281 socketClosed = true;
282 close(deviceReply);
283 } else {
284 deviceReply = deviceReply.replaceAll(MSGLEN_REGEX_PATTERN, "");
285 deviceReply = deviceReply.replaceAll(CHUNKED_END_REGEX_PATTERN, "");
286 dealWithReply(deviceReply);
287 deviceReplyBuilder.setLength(0);
Andrea Campanella101417d2015-12-11 17:58:07 -0800288 }
289 }
Andrea Campanella101417d2015-12-11 17:58:07 -0800290 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200291 } catch (IOException e) {
292 log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e);
Ray Milkey986a47a2018-01-25 11:38:51 -0800293 throw new IllegalStateException(new NetconfException("Error in reading from the session for device {}" +
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200294 netconfDeviceInfo, e));
295 //TODO should we send a socket closed message to listeners ?
296 }
297 }
298
299 private void close(String deviceReply) {
300 log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}",
301 netconfDeviceInfo, deviceReply);
302 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
303 NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
304 null, null, Optional.of(-1), netconfDeviceInfo);
305 netconfDeviceEventListeners.forEach(
306 listener -> listener.event(event));
307 this.interrupt();
308 }
309
310 private void dealWithReply(String deviceReply) {
311 if (deviceReply.contains(RPC_REPLY) ||
312 deviceReply.contains(RPC_ERROR) ||
313 deviceReply.contains(HELLO)) {
314 log.debug("Netconf device {} sessionDelegate.notify() DEVICE_REPLY {} {}",
315 netconfDeviceInfo, getMsgId(deviceReply), deviceReply);
316 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
317 NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
318 null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
319 sessionDelegate.notify(event);
320 netconfDeviceEventListeners.forEach(
321 listener -> listener.event(event));
322 } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
323 log.debug("Netconf device {} DEVICE_NOTIFICATION {} {} {}",
324 netconfDeviceInfo, enableNotifications,
325 getMsgId(deviceReply), deviceReply);
326 if (enableNotifications) {
327 log.debug("dispatching to {} listeners", netconfDeviceEventListeners.size());
328 final String finalDeviceReply = deviceReply;
329 netconfDeviceEventListeners.forEach(
330 listener -> listener.event(new NetconfDeviceOutputEvent(
331 NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
332 null, finalDeviceReply, getMsgId(finalDeviceReply),
333 netconfDeviceInfo)));
334 }
335 } else {
336 log.debug("Error on reply from device {} {}", netconfDeviceInfo, deviceReply);
337 }
338 }
339
340 static boolean validateChunkedFraming(String reply) {
341 Matcher matcher = CHUNKED_FRAMING_PATTERN.matcher(reply);
342 if (!matcher.matches()) {
343 log.debug("Error on reply {}", reply);
344 return false;
345 }
346 Matcher chunkM = CHUNKED_SIZE_PATTERN.matcher(reply);
347 List<MatchResult> chunks = new ArrayList<>();
348 String chunkdataStr = "";
349 while (chunkM.find()) {
350 chunks.add(chunkM.toMatchResult());
351 // extract chunk-data (and later) in bytes
352 int bytes = Integer.parseInt(chunkM.group(1));
353 byte[] chunkdata = reply.substring(chunkM.end()).getBytes(StandardCharsets.UTF_8);
354 if (bytes > chunkdata.length) {
355 log.debug("Error on reply - wrong chunk size {}", reply);
356 return false;
357 }
358 //check if after chunk-size bytes there is next chunk or message ending
359 if (chunkdata[bytes] != LF_CHAR || chunkdata[bytes + 1] != HASH_CHAR) {
360 log.debug("Error on reply - wrong chunk size {}", reply);
361 return false;
362 }
363 // convert (only) chunk-data part into String
364 chunkdataStr = new String(chunkdata, 0, bytes, StandardCharsets.UTF_8);
365 // skip chunk-data part from next match
366 chunkM.region(chunkM.end() + chunkdataStr.length(), reply.length());
367 }
368 if (!"\n##\n".equals(reply.substring(chunks.get(chunks.size() - 1).end() + chunkdataStr.length()))) {
369 log.debug("Error on reply {}", reply);
370 return false;
371 }
372 return true;
Andrea Campanella101417d2015-12-11 17:58:07 -0800373 }
374
Sean Condond2c8d472017-02-17 17:09:39 +0000375 protected static Optional<Integer> getMsgId(String reply) {
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -0500376 Matcher matcher = MSGID_PATTERN.matcher(reply);
377 if (matcher.find()) {
Sean Condon7347de92017-07-21 12:17:25 +0100378 try {
379 return Optional.of(Integer.valueOf(matcher.group(1)));
380 } catch (NumberFormatException e) {
381 log.warn("Failed to parse message-id from {}", matcher.group(), e);
382 }
Konstantinos Kanonakis1b8b5592016-09-09 14:34:37 -0500383 }
384 if (reply.contains(HELLO)) {
Andrea Campanella7bbe7b12017-05-03 16:03:38 -0700385 return Optional.of(-1);
Andrea Campanella101417d2015-12-11 17:58:07 -0800386 }
Andrea Campanella1311ea02016-03-04 17:51:25 -0800387 return Optional.empty();
Andrea Campanella101417d2015-12-11 17:58:07 -0800388 }
389
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700390 @Override
Andrea Campanella101417d2015-12-11 17:58:07 -0800391 public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
392 if (!netconfDeviceEventListeners.contains(listener)) {
393 netconfDeviceEventListeners.add(listener);
394 }
395 }
396
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700397 @Override
Andrea Campanella101417d2015-12-11 17:58:07 -0800398 public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
399 netconfDeviceEventListeners.remove(listener);
400 }
helenyrwu0407c642016-06-09 12:01:30 -0700401
Yuta HIGUCHI0454d702017-03-17 10:08:38 -0700402 @Override
helenyrwu0407c642016-06-09 12:01:30 -0700403 public void setEnableNotifications(boolean enableNotifications) {
404 this.enableNotifications = enableNotifications;
405 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200406}