blob: ec6346aa95f7347c0043a9f5fac3ba960d9af48a [file] [log] [blame]
Yuta HIGUCHI348b3232017-04-20 10:19:48 -07001/*
2 * Copyright 2015-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.netconf.ctl;
18
19import com.google.common.base.Preconditions;
20import com.google.common.collect.Lists;
21import org.onosproject.netconf.NetconfDeviceInfo;
22import org.onosproject.netconf.NetconfDeviceOutputEvent;
23import org.onosproject.netconf.NetconfDeviceOutputEventListener;
24import org.onosproject.netconf.NetconfException;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28import java.io.BufferedReader;
29import java.io.IOException;
30import java.io.InputStream;
31import java.io.InputStreamReader;
32import java.io.OutputStream;
33import java.io.PrintWriter;
34import java.util.List;
35import java.util.Map;
36import java.util.Optional;
37import java.util.concurrent.CompletableFuture;
38import java.util.regex.Matcher;
39import java.util.regex.Pattern;
40
41/**
42 * Thread that gets spawned each time a session is established and handles all the input
43 * and output from the session's streams to and from the NETCONF device the session is
44 * established with.
45 *
46 * @deprecated in 1.10.0
47 */
48@Deprecated
49public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
50
51 private static final Logger log = LoggerFactory
52 .getLogger(NetconfStreamThread.class);
53 private static final String HELLO = "<hello";
54 private static final String END_PATTERN = "]]>]]>";
55 private static final String RPC_REPLY = "rpc-reply";
56 private static final String RPC_ERROR = "rpc-error";
57 private static final String NOTIFICATION_LABEL = "<notification";
58 private static final String MESSAGE_ID = "message-id=";
59 private static final Pattern MSGID_PATTERN = Pattern.compile(MESSAGE_ID + "\"(\\d+)\"");
60
61 private PrintWriter outputStream;
62 private final InputStream err;
63 private final InputStream in;
64 private NetconfDeviceInfo netconfDeviceInfo;
65 private NetconfSessionDelegate sessionDelegate;
66 private NetconfMessageState state;
67 private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
68 = Lists.newCopyOnWriteArrayList();
69 private boolean enableNotifications = true;
70 private Map<Integer, CompletableFuture<String>> replies;
71
72 public NetconfStreamThread(final InputStream in, final OutputStream out,
73 final InputStream err, NetconfDeviceInfo deviceInfo,
74 NetconfSessionDelegate delegate,
75 Map<Integer, CompletableFuture<String>> replies) {
76 this.in = in;
77 this.err = err;
78 outputStream = new PrintWriter(out);
79 netconfDeviceInfo = deviceInfo;
80 state = NetconfMessageState.NO_MATCHING_PATTERN;
81 sessionDelegate = delegate;
82 this.replies = replies;
83 log.debug("Stream thread for device {} session started", deviceInfo);
84 start();
85 }
86
87 @Override
88 public CompletableFuture<String> sendMessage(String request) {
89 Optional<Integer> messageId = getMsgId(request);
90 return sendMessage(request, messageId.get());
91 }
92
93 @Override
94 public CompletableFuture<String> sendMessage(String request, int messageId) {
95 log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
96 CompletableFuture<String> cf = new CompletableFuture<>();
97 replies.put(messageId, cf);
98
99 synchronized (outputStream) {
100 outputStream.print(request);
101 outputStream.flush();
102 }
103
104 return cf;
105 }
106
107 public enum NetconfMessageState {
108
109 NO_MATCHING_PATTERN {
110 @Override
111 NetconfMessageState evaluateChar(char c) {
112 if (c == ']') {
113 return FIRST_BRACKET;
114 } else {
115 return this;
116 }
117 }
118 },
119 FIRST_BRACKET {
120 @Override
121 NetconfMessageState evaluateChar(char c) {
122 if (c == ']') {
123 return SECOND_BRACKET;
124 } else {
125 return NO_MATCHING_PATTERN;
126 }
127 }
128 },
129 SECOND_BRACKET {
130 @Override
131 NetconfMessageState evaluateChar(char c) {
132 if (c == '>') {
133 return FIRST_BIGGER;
134 } else {
135 return NO_MATCHING_PATTERN;
136 }
137 }
138 },
139 FIRST_BIGGER {
140 @Override
141 NetconfMessageState evaluateChar(char c) {
142 if (c == ']') {
143 return THIRD_BRACKET;
144 } else {
145 return NO_MATCHING_PATTERN;
146 }
147 }
148 },
149 THIRD_BRACKET {
150 @Override
151 NetconfMessageState evaluateChar(char c) {
152 if (c == ']') {
153 return ENDING_BIGGER;
154 } else {
155 return NO_MATCHING_PATTERN;
156 }
157 }
158 },
159 ENDING_BIGGER {
160 @Override
161 NetconfMessageState evaluateChar(char c) {
162 if (c == '>') {
163 return END_PATTERN;
164 } else {
165 return NO_MATCHING_PATTERN;
166 }
167 }
168 },
169 END_PATTERN {
170 @Override
171 NetconfMessageState evaluateChar(char c) {
172 return NO_MATCHING_PATTERN;
173 }
174 };
175
176 abstract NetconfMessageState evaluateChar(char c);
177 }
178
179 @Override
180 public void run() {
181 BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
182 try {
183 boolean socketClosed = false;
184 StringBuilder deviceReplyBuilder = new StringBuilder();
185 while (!socketClosed) {
186 int cInt = bufferReader.read();
187 if (cInt == -1) {
188 log.debug("Netconf device {} sent error char in session," +
189 " will need to be reopend", netconfDeviceInfo);
190 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
191 NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
192 null, null, Optional.of(-1), netconfDeviceInfo);
193 netconfDeviceEventListeners.forEach(
194 listener -> listener.event(event));
195 socketClosed = true;
196 log.debug("Netconf device {} ERROR cInt == -1 socketClosed = true", netconfDeviceInfo);
197 }
198 char c = (char) cInt;
199 state = state.evaluateChar(c);
200 deviceReplyBuilder.append(c);
201 if (state == NetconfMessageState.END_PATTERN) {
202 String deviceReply = deviceReplyBuilder.toString();
203 if (deviceReply.equals(END_PATTERN)) {
204 socketClosed = true;
205 log.debug("Netconf device {} socketClosed = true DEVICE_UNREGISTERED {}",
206 netconfDeviceInfo, deviceReply);
207 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
208 NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
209 null, null, Optional.of(-1), netconfDeviceInfo);
210 netconfDeviceEventListeners.forEach(
211 listener -> listener.event(event));
212 this.interrupt();
213 } else {
214 deviceReply = deviceReply.replace(END_PATTERN, "");
215 if (deviceReply.contains(RPC_REPLY) ||
216 deviceReply.contains(RPC_ERROR) ||
217 deviceReply.contains(HELLO)) {
218 log.debug("Netconf device {} sessionDelegate.notify() DEVICE_REPLY {} {}",
219 netconfDeviceInfo, getMsgId(deviceReply), deviceReply);
220 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
221 NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
222 null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
223 sessionDelegate.notify(event);
224 netconfDeviceEventListeners.forEach(
225 listener -> listener.event(event));
226 } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
227 log.debug("Netconf device {} DEVICE_NOTIFICATION {} {} {}",
228 netconfDeviceInfo, enableNotifications,
229 getMsgId(deviceReply), deviceReply);
230 if (enableNotifications) {
231 log.debug("dispatching to {} listeners", netconfDeviceEventListeners.size());
232 final String finalDeviceReply = deviceReply;
233 netconfDeviceEventListeners.forEach(
234 listener -> listener.event(new NetconfDeviceOutputEvent(
235 NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
236 null, finalDeviceReply, getMsgId(finalDeviceReply),
237 netconfDeviceInfo)));
238 }
239 } else {
240 log.debug("Error on reply from device {} {}", netconfDeviceInfo, deviceReply);
241 }
242 deviceReplyBuilder.setLength(0);
243 }
244 }
245 }
246 } catch (IOException e) {
247 log.warn("Error in reading from the session for device {} ", netconfDeviceInfo, e);
248 throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
249 netconfDeviceInfo, e));
250 //TODO should we send a socket closed message to listeners ?
251 }
252 }
253
254 protected static Optional<Integer> getMsgId(String reply) {
255 Matcher matcher = MSGID_PATTERN.matcher(reply);
256 if (matcher.find()) {
257 Integer messageId = Integer.parseInt(matcher.group(1));
258 Preconditions.checkNotNull(messageId, "Error in retrieving the message id");
259 return Optional.of(messageId);
260 }
261 if (reply.contains(HELLO)) {
262 return Optional.of(0);
263 }
264 return Optional.empty();
265 }
266
267 @Override
268 public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
269 if (!netconfDeviceEventListeners.contains(listener)) {
270 netconfDeviceEventListeners.add(listener);
271 }
272 }
273
274 @Override
275 public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
276 netconfDeviceEventListeners.remove(listener);
277 }
278
279 @Override
280 public void setEnableNotifications(boolean enableNotifications) {
281 this.enableNotifications = enableNotifications;
282 }
283}