blob: 27e424744fd4cb213658b90cb391a02cb0a3dae5 [file] [log] [blame]
Sean Condond2c8d472017-02-17 17:09:39 +00001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Sean Condond2c8d472017-02-17 17:09:39 +00003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Yuta HIGUCHIe3ae8212017-04-20 10:18:41 -070016package org.onosproject.netconf.ctl.impl;
Sean Condond2c8d472017-02-17 17:09:39 +000017
Kamil Stasiak9f59f442017-05-02 11:02:24 +020018
Sean Condond2c8d472017-02-17 17:09:39 +000019import java.io.BufferedReader;
Sean Condond2c8d472017-02-17 17:09:39 +000020import java.io.InputStream;
Sean Condond2c8d472017-02-17 17:09:39 +000021import java.io.OutputStream;
22import java.io.PrintWriter;
Kamil Stasiak9f59f442017-05-02 11:02:24 +020023import java.io.UnsupportedEncodingException;
24import java.io.IOException;
25import java.io.InputStreamReader;
26import java.io.EOFException;
Andrea Campanella7bbe7b12017-05-03 16:03:38 -070027import java.nio.Buffer;
28import java.nio.ByteBuffer;
Sean Condond2c8d472017-02-17 17:09:39 +000029import java.util.Collection;
30import java.util.Optional;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Future;
33
Kamil Stasiak9f59f442017-05-02 11:02:24 +020034import org.apache.commons.lang3.tuple.Pair;
Sean Condond2c8d472017-02-17 17:09:39 +000035import org.apache.sshd.common.NamedFactory;
Andrea Campanella7bbe7b12017-05-03 16:03:38 -070036import org.apache.sshd.common.util.threads.ThreadUtils;
Sean Condond2c8d472017-02-17 17:09:39 +000037import org.apache.sshd.server.Command;
38import org.apache.sshd.server.Environment;
39import org.apache.sshd.server.ExitCallback;
40import org.apache.sshd.server.SessionAware;
41import org.apache.sshd.server.session.ServerSession;
Yuta HIGUCHIe3ae8212017-04-20 10:18:41 -070042import org.onosproject.netconf.ctl.impl.NetconfStreamThread.NetconfMessageState;
Sean Condond2c8d472017-02-17 17:09:39 +000043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
46/**
47 * Mocks a NETCONF Device to test the NETCONF Southbound Interface etc.
48 *
49 * Implements the 'netconf' subsystem on Apache SSH (Mina).
50 * See SftpSubsystem for an example of another subsystem
51 */
52public class NetconfSshdTestSubsystem extends Thread implements Command, Runnable, SessionAware {
53
54 protected final Logger log = LoggerFactory.getLogger(getClass());
55
56 public static class Factory implements NamedFactory<Command> {
57
58 public static final String NAME = "netconf";
59
60 private final ExecutorService executors;
61 private final boolean shutdownExecutor;
62
63 public Factory() {
64 this(null);
65 }
66
67 /**
68 * @param executorService The {@link ExecutorService} to be used by
69 * the {@link SftpSubsystem} command when starting execution. If
70 * {@code null} then a single-threaded ad-hoc service is used.
71 * <B>Note:</B> the service will <U>not</U> be shutdown when the
72 * subsystem is closed - unless it is the ad-hoc service, which will be
73 * shutdown regardless
74 * @see Factory(ExecutorService, boolean)}
75 */
76 public Factory(ExecutorService executorService) {
77 this(executorService, false);
78 }
79
80 /**
81 * @param executorService The {@link ExecutorService} to be used by
82 * the {@link SftpSubsystem} command when starting execution. If
83 * {@code null} then a single-threaded ad-hoc service is used.
84 * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
85 * will be called when subsystem terminates - unless it is the ad-hoc
86 * service, which will be shutdown regardless
87 */
88 public Factory(ExecutorService executorService, boolean shutdownOnExit) {
89 executors = executorService;
90 shutdownExecutor = shutdownOnExit;
91 }
92
93 public ExecutorService getExecutorService() {
94 return executors;
95 }
96
97 public boolean isShutdownOnExit() {
98 return shutdownExecutor;
99 }
100
Yuta HIGUCHIe3ae8212017-04-20 10:18:41 -0700101 @Override
Sean Condond2c8d472017-02-17 17:09:39 +0000102 public Command create() {
103 return new NetconfSshdTestSubsystem(getExecutorService(), isShutdownOnExit());
104 }
105
Yuta HIGUCHIe3ae8212017-04-20 10:18:41 -0700106 @Override
Sean Condond2c8d472017-02-17 17:09:39 +0000107 public String getName() {
108 return NAME;
109 }
110 }
111
112 /**
113 * Properties key for the maximum of available open handles per session.
114 */
115 private static final String CLOSE_SESSION = "<close-session";
116 private static final String END_PATTERN = "]]>]]>";
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200117 private static final String HASH = "#";
118 private static final String LF = "\n";
119 private static final String MSGLEN_REGEX_PATTERN = "\n#\\d+\n";
120 private static final String MSGLEN_PART_REGEX_PATTERN = "\\d+\n";
121 private static final String CHUNKED_END_REGEX_PATTERN = "\n##\n";
Sean Condond2c8d472017-02-17 17:09:39 +0000122
123 private ExecutorService executors;
124 private boolean shutdownExecutor;
125 private ExitCallback callback;
126 private ServerSession session;
127 private InputStream in;
128 private OutputStream out;
129 private OutputStream err;
130 private Environment env;
131 private Future<?> pendingFuture;
132 private boolean closed = false;
133 private NetconfMessageState state;
134 private PrintWriter outputStream;
135
136 public NetconfSshdTestSubsystem() {
137 this(null);
138 }
139
140 /**
141 * @param executorService The {@link ExecutorService} to be used by
142 * the {@link SftpSubsystem} command when starting execution. If
143 * {@code null} then a single-threaded ad-hoc service is used.
144 * <b>Note:</b> the service will <U>not</U> be shutdown when the
145 * subsystem is closed - unless it is the ad-hoc service
146 * @see #SftpSubsystem(ExecutorService, boolean)
147 */
148 public NetconfSshdTestSubsystem(ExecutorService executorService) {
149 this(executorService, false);
150 }
151
152 /**
153 * @param executorService The {@link ExecutorService} to be used by
154 * the {@link SftpSubsystem} command when starting execution. If
155 * {@code null} then a single-threaded ad-hoc service is used.
156 * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
157 * will be called when subsystem terminates - unless it is the ad-hoc
158 * service, which will be shutdown regardless
159 * @see ThreadUtils#newSingleThreadExecutor(String)
160 */
161 public NetconfSshdTestSubsystem(ExecutorService executorService, boolean shutdownOnExit) {
162 executors = executorService;
163 if (executorService == null) {
164 executors = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
165 shutdownExecutor = true; // we always close the ad-hoc executor service
166 } else {
167 shutdownExecutor = shutdownOnExit;
168 }
169 }
170
171 @Override
172 public void setSession(ServerSession session) {
173 this.session = session;
174 }
175
176 @Override
177 public void run() {
178 BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
179 boolean socketClosed = false;
180 try {
181 StringBuilder deviceRequestBuilder = new StringBuilder();
182 while (!socketClosed) {
183 int cInt = bufferReader.read();
184 if (cInt == -1) {
185 log.info("Netconf client sent error");
186 socketClosed = true;
187 }
188 char c = (char) cInt;
189 state = state.evaluateChar(c);
190 deviceRequestBuilder.append(c);
191 if (state == NetconfMessageState.END_PATTERN) {
192 String deviceRequest = deviceRequestBuilder.toString();
193 if (deviceRequest.equals(END_PATTERN)) {
194 socketClosed = true;
195 this.interrupt();
196 } else {
197 deviceRequest = deviceRequest.replace(END_PATTERN, "");
198 Optional<Integer> messageId = NetconfStreamThread.getMsgId(deviceRequest);
199 log.info("Client Request on session {}. MsgId {}: {}",
Andrea Campanella7bbe7b12017-05-03 16:03:38 -0700200 session.getSessionId(), messageId, deviceRequest);
Sean Condond2c8d472017-02-17 17:09:39 +0000201 synchronized (outputStream) {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200202
Sean Condond2c8d472017-02-17 17:09:39 +0000203 if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200204
Sean Condond2c8d472017-02-17 17:09:39 +0000205 String helloReply =
Andrea Campanella7bbe7b12017-05-03 16:03:38 -0700206 NetconfSessionImplTest.getTestHelloReply(Optional.of(ByteBuffer.wrap(
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200207 session.getSessionId()).asLongBuffer().get()), false);
Sean Condond2c8d472017-02-17 17:09:39 +0000208 outputStream.write(helloReply + END_PATTERN);
209 outputStream.flush();
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200210 } else if (NetconfSessionImplTest.HELLO_REQ_PATTERN_1_1.matcher(deviceRequest).matches()) {
211
212 String helloReply =
213 NetconfSessionImplTest.getTestHelloReply(Optional.of(ByteBuffer.wrap(
214 session.getSessionId()).asLongBuffer().get()), true);
215 outputStream.write(helloReply + END_PATTERN);
Sean Condond2c8d472017-02-17 17:09:39 +0000216 outputStream.flush();
217 } else {
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200218 Pair replyClosedPair = dealWithRequest(deviceRequest, messageId);
219 String reply = (String) replyClosedPair.getLeft();
220 if (reply != null) {
221 Boolean newSockedClosed = (Boolean) replyClosedPair.getRight();
222 socketClosed = newSockedClosed.booleanValue();
223 outputStream.write(reply + END_PATTERN);
224 outputStream.flush();
225 }
Sean Condond2c8d472017-02-17 17:09:39 +0000226 }
227 }
228 deviceRequestBuilder.setLength(0);
229 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200230 } else if (state == NetconfMessageState.END_CHUNKED_PATTERN) {
231 String deviceRequest = deviceRequestBuilder.toString();
232 if (!validateChunkedFraming(deviceRequest)) {
233 log.error("Netconf client send badly framed message {}",
234 deviceRequest);
235 } else {
236 deviceRequest = deviceRequest.replaceAll(MSGLEN_REGEX_PATTERN, "");
237 deviceRequest = deviceRequest.replaceAll(CHUNKED_END_REGEX_PATTERN, "");
238 Optional<Integer> messageId = NetconfStreamThread.getMsgId(deviceRequest);
239 log.info("Client Request on session {}. MsgId {}: {}",
240 session.getSessionId(), messageId, deviceRequest);
241
242 synchronized (outputStream) {
243
244 if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) {
245 String helloReply =
246 NetconfSessionImplTest.getTestHelloReply(Optional.of(ByteBuffer.wrap(
247 session.getSessionId()).asLongBuffer().get()), true);
248 outputStream.write(helloReply + END_PATTERN);
249 outputStream.flush();
250 } else {
251 Pair replyClosedPair = dealWithRequest(deviceRequest, messageId);
252 String reply = (String) replyClosedPair.getLeft();
253 if (reply != null) {
254 Boolean newSockedClosed = (Boolean) replyClosedPair.getRight();
255 socketClosed = newSockedClosed.booleanValue();
256 outputStream.write(formatChunkedMessage(reply));
257 outputStream.flush();
258 }
259 }
260 }
261
262 }
263 deviceRequestBuilder.setLength(0);
Sean Condond2c8d472017-02-17 17:09:39 +0000264 }
265 }
266 } catch (Throwable t) {
267 if (!socketClosed && !(t instanceof EOFException)) { // Ignore
268 log.error("Exception caught in NETCONF Server subsystem", t.getMessage());
269 }
270 } finally {
271 try {
272 bufferReader.close();
273 } catch (IOException ioe) {
274 log.error("Could not close DataInputStream", ioe);
275 }
276
277 callback.onExit(0);
278 }
279 }
280
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200281 private boolean validateChunkedFraming(String reply) {
282 String[] strs = reply.split(LF + HASH);
283 int strIndex = 0;
284 while (strIndex < strs.length) {
285 String str = strs[strIndex];
286 if ((str.equals(HASH + LF))) {
287 return true;
288 }
289 if (!str.equals("")) {
290 try {
291 if (str.equals(LF)) {
292 return false;
293 }
294 int len = Integer.parseInt(str.split(LF)[0]);
295 if (str.split(MSGLEN_PART_REGEX_PATTERN)[1].getBytes("UTF-8").length != len) {
296 return false;
297 }
298 } catch (NumberFormatException e) {
299 return false;
300 } catch (UnsupportedEncodingException e) {
301 e.printStackTrace();
302 }
303 }
304 strIndex++;
305 }
306 return true;
307 }
308
309 private Pair<String, Boolean> dealWithRequest(String deviceRequest, Optional<Integer> messageId) {
310 if (NetconfSessionImplTest.EDIT_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
311 || NetconfSessionImplTest.COPY_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
312 || NetconfSessionImplTest.LOCK_REQ_PATTERN.matcher(deviceRequest).matches()
313 || NetconfSessionImplTest.UNLOCK_REQ_PATTERN.matcher(deviceRequest).matches()) {
314 return Pair.of(NetconfSessionImplTest.getOkReply(messageId), false);
315
316 } else if (NetconfSessionImplTest.GET_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
317 || NetconfSessionImplTest.GET_REQ_PATTERN.matcher(deviceRequest).matches()) {
318 return Pair.of(NetconfSessionImplTest.getGetReply(messageId), false);
319 } else if (deviceRequest.contains(CLOSE_SESSION)) {
320 return Pair.of(NetconfSessionImplTest.getOkReply(messageId), true);
321 } else {
322 log.error("Unexpected NETCONF message structure on session {} : {}",
323 ByteBuffer.wrap(
324 session.getSessionId()).asLongBuffer().get(), deviceRequest);
325 return null;
326 }
327 }
328
329 private String formatChunkedMessage(String message) {
330 if (message.endsWith(END_PATTERN)) {
331 message = message.split(END_PATTERN)[0];
332 }
333 if (!message.startsWith(LF + HASH)) {
334 try {
335 message = LF + HASH + message.getBytes("UTF-8").length + LF + message + LF + HASH + HASH + LF;
336 } catch (UnsupportedEncodingException e) {
337 e.printStackTrace();
338 }
339 }
340 return message;
341 }
342
343
Sean Condond2c8d472017-02-17 17:09:39 +0000344 @Override
345 public void setInputStream(InputStream in) {
346 this.in = in;
347 }
348
349 @Override
350 public void setOutputStream(OutputStream out) {
351 this.out = out;
352 }
353
354 @Override
355 public void setErrorStream(OutputStream err) {
356 this.err = err;
357 }
358
359 @Override
360 public void setExitCallback(ExitCallback callback) {
361 this.callback = callback;
362 }
363
364 @Override
365 public void start(Environment env) throws IOException {
366 this.env = env;
367 state = NetconfMessageState.NO_MATCHING_PATTERN;
368 outputStream = new PrintWriter(out, false);
369 try {
370 pendingFuture = executors.submit(this);
371 } catch (RuntimeException e) { // e.g., RejectedExecutionException
372 log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.getMessage(), e);
373 throw new IOException(e);
374 }
375 }
376
377 @Override
378 public void interrupt() {
379 // if thread has not completed, cancel it
380 if ((pendingFuture != null) && (!pendingFuture.isDone())) {
381 boolean result = pendingFuture.cancel(true);
382 // TODO consider waiting some reasonable (?) amount of time for cancellation
383 if (log.isDebugEnabled()) {
384 log.debug("interrupt() - cancel pending future=" + result);
385 }
386 }
387
388 pendingFuture = null;
389
390 if ((executors != null) && shutdownExecutor) {
391 Collection<Runnable> runners = executors.shutdownNow();
392 if (log.isDebugEnabled()) {
393 log.debug("interrupt() - shutdown executor service - runners count=" +
394 runners.size());
395 }
396 }
397
398 executors = null;
399
400 if (!closed) {
401 if (log.isDebugEnabled()) {
402 log.debug("interrupt() - mark as closed");
403 }
404
405 closed = true;
406 }
407 outputStream.close();
408 }
409
410 @Override
411 public void destroy() {
412 //Handled by interrupt
413 }
414
415 protected void process(Buffer buffer) throws IOException {
416 log.warn("Receieved buffer:" + buffer);
417 }
Kamil Stasiak9f59f442017-05-02 11:02:24 +0200418}