blob: b2f53f51b4940c833eba0150de997931452c0e85 [file] [log] [blame]
Sean Condond2c8d472017-02-17 17:09:39 +00001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.netconf.ctl;
17
18import java.io.BufferedReader;
19import java.io.EOFException;
20import java.io.IOException;
21import java.io.InputStream;
22import java.io.InputStreamReader;
23import java.io.OutputStream;
24import java.io.PrintWriter;
25import java.util.Collection;
26import java.util.Optional;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Future;
29
30import org.apache.sshd.common.NamedFactory;
31import org.apache.sshd.common.util.Buffer;
32import org.apache.sshd.common.util.ThreadUtils;
33import org.apache.sshd.server.Command;
34import org.apache.sshd.server.Environment;
35import org.apache.sshd.server.ExitCallback;
36import org.apache.sshd.server.SessionAware;
37import org.apache.sshd.server.session.ServerSession;
38import org.onosproject.netconf.ctl.NetconfStreamThread.NetconfMessageState;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42/**
43 * Mocks a NETCONF Device to test the NETCONF Southbound Interface etc.
44 *
45 * Implements the 'netconf' subsystem on Apache SSH (Mina).
46 * See SftpSubsystem for an example of another subsystem
47 */
48public class NetconfSshdTestSubsystem extends Thread implements Command, Runnable, SessionAware {
49
50 protected final Logger log = LoggerFactory.getLogger(getClass());
51
52 public static class Factory implements NamedFactory<Command> {
53
54 public static final String NAME = "netconf";
55
56 private final ExecutorService executors;
57 private final boolean shutdownExecutor;
58
59 public Factory() {
60 this(null);
61 }
62
63 /**
64 * @param executorService The {@link ExecutorService} to be used by
65 * the {@link SftpSubsystem} command when starting execution. If
66 * {@code null} then a single-threaded ad-hoc service is used.
67 * <B>Note:</B> the service will <U>not</U> be shutdown when the
68 * subsystem is closed - unless it is the ad-hoc service, which will be
69 * shutdown regardless
70 * @see Factory(ExecutorService, boolean)}
71 */
72 public Factory(ExecutorService executorService) {
73 this(executorService, false);
74 }
75
76 /**
77 * @param executorService The {@link ExecutorService} to be used by
78 * the {@link SftpSubsystem} command when starting execution. If
79 * {@code null} then a single-threaded ad-hoc service is used.
80 * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
81 * will be called when subsystem terminates - unless it is the ad-hoc
82 * service, which will be shutdown regardless
83 */
84 public Factory(ExecutorService executorService, boolean shutdownOnExit) {
85 executors = executorService;
86 shutdownExecutor = shutdownOnExit;
87 }
88
89 public ExecutorService getExecutorService() {
90 return executors;
91 }
92
93 public boolean isShutdownOnExit() {
94 return shutdownExecutor;
95 }
96
97 public Command create() {
98 return new NetconfSshdTestSubsystem(getExecutorService(), isShutdownOnExit());
99 }
100
101 public String getName() {
102 return NAME;
103 }
104 }
105
106 /**
107 * Properties key for the maximum of available open handles per session.
108 */
109 private static final String CLOSE_SESSION = "<close-session";
110 private static final String END_PATTERN = "]]>]]>";
111
112 private ExecutorService executors;
113 private boolean shutdownExecutor;
114 private ExitCallback callback;
115 private ServerSession session;
116 private InputStream in;
117 private OutputStream out;
118 private OutputStream err;
119 private Environment env;
120 private Future<?> pendingFuture;
121 private boolean closed = false;
122 private NetconfMessageState state;
123 private PrintWriter outputStream;
124
125 public NetconfSshdTestSubsystem() {
126 this(null);
127 }
128
129 /**
130 * @param executorService The {@link ExecutorService} to be used by
131 * the {@link SftpSubsystem} command when starting execution. If
132 * {@code null} then a single-threaded ad-hoc service is used.
133 * <b>Note:</b> the service will <U>not</U> be shutdown when the
134 * subsystem is closed - unless it is the ad-hoc service
135 * @see #SftpSubsystem(ExecutorService, boolean)
136 */
137 public NetconfSshdTestSubsystem(ExecutorService executorService) {
138 this(executorService, false);
139 }
140
141 /**
142 * @param executorService The {@link ExecutorService} to be used by
143 * the {@link SftpSubsystem} command when starting execution. If
144 * {@code null} then a single-threaded ad-hoc service is used.
145 * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
146 * will be called when subsystem terminates - unless it is the ad-hoc
147 * service, which will be shutdown regardless
148 * @see ThreadUtils#newSingleThreadExecutor(String)
149 */
150 public NetconfSshdTestSubsystem(ExecutorService executorService, boolean shutdownOnExit) {
151 executors = executorService;
152 if (executorService == null) {
153 executors = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
154 shutdownExecutor = true; // we always close the ad-hoc executor service
155 } else {
156 shutdownExecutor = shutdownOnExit;
157 }
158 }
159
160 @Override
161 public void setSession(ServerSession session) {
162 this.session = session;
163 }
164
165 @Override
166 public void run() {
167 BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
168 boolean socketClosed = false;
169 try {
170 StringBuilder deviceRequestBuilder = new StringBuilder();
171 while (!socketClosed) {
172 int cInt = bufferReader.read();
173 if (cInt == -1) {
174 log.info("Netconf client sent error");
175 socketClosed = true;
176 }
177 char c = (char) cInt;
178 state = state.evaluateChar(c);
179 deviceRequestBuilder.append(c);
180 if (state == NetconfMessageState.END_PATTERN) {
181 String deviceRequest = deviceRequestBuilder.toString();
182 if (deviceRequest.equals(END_PATTERN)) {
183 socketClosed = true;
184 this.interrupt();
185 } else {
186 deviceRequest = deviceRequest.replace(END_PATTERN, "");
187 Optional<Integer> messageId = NetconfStreamThread.getMsgId(deviceRequest);
188 log.info("Client Request on session {}. MsgId {}: {}",
189 session.getId(), messageId, deviceRequest);
190 synchronized (outputStream) {
191 if (NetconfSessionImplTest.HELLO_REQ_PATTERN.matcher(deviceRequest).matches()) {
192 String helloReply =
193 NetconfSessionImplTest.getTestHelloReply(Optional.of(session.getId()));
194 outputStream.write(helloReply + END_PATTERN);
195 outputStream.flush();
196 } else if (NetconfSessionImplTest.EDIT_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
197 || NetconfSessionImplTest.COPY_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()) {
198 outputStream.write(NetconfSessionImplTest.getOkReply(messageId) + END_PATTERN);
199 outputStream.flush();
200 } else if (NetconfSessionImplTest.GET_CONFIG_REQ_PATTERN.matcher(deviceRequest).matches()
201 || NetconfSessionImplTest.GET_REQ_PATTERN.matcher(deviceRequest).matches()) {
202 outputStream.write(NetconfSessionImplTest.getGetReply(messageId) + END_PATTERN);
203 outputStream.flush();
204 } else if (deviceRequest.contains(CLOSE_SESSION)) {
205 socketClosed = true;
206 outputStream.write(NetconfSessionImplTest.getOkReply(messageId) + END_PATTERN);
207 outputStream.flush();
208 } else {
209 log.error("Unexpected NETCONF message structure on session {} : {}",
210 session.getId(), deviceRequest);
211 }
212 }
213 deviceRequestBuilder.setLength(0);
214 }
215 }
216 }
217 } catch (Throwable t) {
218 if (!socketClosed && !(t instanceof EOFException)) { // Ignore
219 log.error("Exception caught in NETCONF Server subsystem", t.getMessage());
220 }
221 } finally {
222 try {
223 bufferReader.close();
224 } catch (IOException ioe) {
225 log.error("Could not close DataInputStream", ioe);
226 }
227
228 callback.onExit(0);
229 }
230 }
231
232 @Override
233 public void setInputStream(InputStream in) {
234 this.in = in;
235 }
236
237 @Override
238 public void setOutputStream(OutputStream out) {
239 this.out = out;
240 }
241
242 @Override
243 public void setErrorStream(OutputStream err) {
244 this.err = err;
245 }
246
247 @Override
248 public void setExitCallback(ExitCallback callback) {
249 this.callback = callback;
250 }
251
252 @Override
253 public void start(Environment env) throws IOException {
254 this.env = env;
255 state = NetconfMessageState.NO_MATCHING_PATTERN;
256 outputStream = new PrintWriter(out, false);
257 try {
258 pendingFuture = executors.submit(this);
259 } catch (RuntimeException e) { // e.g., RejectedExecutionException
260 log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.getMessage(), e);
261 throw new IOException(e);
262 }
263 }
264
265 @Override
266 public void interrupt() {
267 // if thread has not completed, cancel it
268 if ((pendingFuture != null) && (!pendingFuture.isDone())) {
269 boolean result = pendingFuture.cancel(true);
270 // TODO consider waiting some reasonable (?) amount of time for cancellation
271 if (log.isDebugEnabled()) {
272 log.debug("interrupt() - cancel pending future=" + result);
273 }
274 }
275
276 pendingFuture = null;
277
278 if ((executors != null) && shutdownExecutor) {
279 Collection<Runnable> runners = executors.shutdownNow();
280 if (log.isDebugEnabled()) {
281 log.debug("interrupt() - shutdown executor service - runners count=" +
282 runners.size());
283 }
284 }
285
286 executors = null;
287
288 if (!closed) {
289 if (log.isDebugEnabled()) {
290 log.debug("interrupt() - mark as closed");
291 }
292
293 closed = true;
294 }
295 outputStream.close();
296 }
297
298 @Override
299 public void destroy() {
300 //Handled by interrupt
301 }
302
303 protected void process(Buffer buffer) throws IOException {
304 log.warn("Receieved buffer:" + buffer);
305 }
306}