blob: 6169c3dc06846d0190945c1812a0384d7ce436e4 [file] [log] [blame]
Andrea Campanella101417d2015-12-11 17:58:07 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.netconf.ctl;
18
19import com.google.common.base.Preconditions;
20import com.google.common.collect.Lists;
21import org.onosproject.netconf.NetconfDeviceInfo;
22import org.onosproject.netconf.NetconfDeviceOutputEvent;
23import org.onosproject.netconf.NetconfDeviceOutputEventListener;
24import org.onosproject.netconf.NetconfException;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28import java.io.BufferedReader;
29import java.io.IOException;
30import java.io.InputStream;
31import java.io.InputStreamReader;
32import java.io.OutputStream;
33import java.io.PrintWriter;
34import java.util.List;
35import java.util.concurrent.CompletableFuture;
36
37/**
38 * Thread that gets spawned each time a session is established and handles all the input
39 * and output from the session's streams to and from the NETCONF device the session is
40 * established with.
41 */
42public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
43
44 private static final Logger log = LoggerFactory
45 .getLogger(NetconfStreamThread.class);
46 private static final String HELLO = "hello";
47 private static final String END_PATTERN = "]]>]]>";
48 private static final String RPC_REPLY = "rpc-reply";
49 private static final String RPC_ERROR = "rpc-error";
50 private static final String NOTIFICATION_LABEL = "<notification>";
51
52 private static PrintWriter outputStream;
53 private static NetconfDeviceInfo netconfDeviceInfo;
54 private static NetconfSessionDelegate sessionDelegate;
55 private static NetconfMessageState state;
56 private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
57 = Lists.newArrayList();
58
59 public NetconfStreamThread(final InputStream in, final OutputStream out,
60 final InputStream err, NetconfDeviceInfo deviceInfo,
61 NetconfSessionDelegate delegate) {
62 super(handler(in, err));
63 outputStream = new PrintWriter(out);
64 netconfDeviceInfo = deviceInfo;
65 state = NetconfMessageState.NO_MATCHING_PATTERN;
66 sessionDelegate = delegate;
67 log.debug("Stream thread for device {} session started", deviceInfo);
68 start();
69 }
70
71 @Override
72 public CompletableFuture<String> sendMessage(String request) {
73 outputStream.print(request);
74 outputStream.flush();
75 return new CompletableFuture<>();
76 }
77
78 public enum NetconfMessageState {
79
80 NO_MATCHING_PATTERN {
81 @Override
82 NetconfMessageState evaluateChar(char c) {
83 if (c == ']') {
84 return FIRST_BRAKET;
85 } else {
86 return this;
87 }
88 }
89 },
90 FIRST_BRAKET {
91 @Override
92 NetconfMessageState evaluateChar(char c) {
93 if (c == ']') {
94 return SECOND_BRAKET;
95 } else {
96 return NO_MATCHING_PATTERN;
97 }
98 }
99 },
100 SECOND_BRAKET {
101 @Override
102 NetconfMessageState evaluateChar(char c) {
103 if (c == '>') {
104 return FIRST_BIGGER;
105 } else {
106 return NO_MATCHING_PATTERN;
107 }
108 }
109 },
110 FIRST_BIGGER {
111 @Override
112 NetconfMessageState evaluateChar(char c) {
113 if (c == ']') {
114 return THIRD_BRAKET;
115 } else {
116 return NO_MATCHING_PATTERN;
117 }
118 }
119 },
120 THIRD_BRAKET {
121 @Override
122 NetconfMessageState evaluateChar(char c) {
123 if (c == ']') {
124 return ENDING_BIGGER;
125 } else {
126 return NO_MATCHING_PATTERN;
127 }
128 }
129 },
130 ENDING_BIGGER {
131 @Override
132 NetconfMessageState evaluateChar(char c) {
133 if (c == '>') {
134 return END_PATTERN;
135 } else {
136 return NO_MATCHING_PATTERN;
137 }
138 }
139 },
140 END_PATTERN {
141 @Override
142 NetconfMessageState evaluateChar(char c) {
143 return NO_MATCHING_PATTERN;
144 }
145 };
146
147 abstract NetconfMessageState evaluateChar(char c);
148 }
149
150 private static Runnable handler(final InputStream in, final InputStream err) {
151 BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
152 return () -> {
153 try {
154 boolean socketClosed = false;
155 StringBuilder deviceReplyBuilder = new StringBuilder();
156 while (!socketClosed) {
157 int cInt = bufferReader.read();
158 if (cInt == -1) {
159 socketClosed = true;
160 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
161 NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
162 null, null, -1, netconfDeviceInfo);
163 netconfDeviceEventListeners.forEach(
164 listener -> listener.event(event));
165 }
166 char c = (char) cInt;
167 state = state.evaluateChar(c);
168 deviceReplyBuilder.append(c);
169 if (state == NetconfMessageState.END_PATTERN) {
170 String deviceReply = deviceReplyBuilder.toString()
171 .replace(END_PATTERN, "");
172 if (deviceReply.contains(RPC_REPLY) ||
173 deviceReply.contains(RPC_ERROR) ||
174 deviceReply.contains(HELLO)) {
175 NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
176 NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
177 null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
178 sessionDelegate.notify(event);
179 netconfDeviceEventListeners.forEach(
180 listener -> listener.event(event));
181 } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
182 final String finalDeviceReply = deviceReply;
183 netconfDeviceEventListeners.forEach(
184 listener -> listener.event(new NetconfDeviceOutputEvent(
185 NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
186 null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
187 } else {
188 log.info("Error on replay from device {} ", deviceReply);
189 }
190 deviceReplyBuilder.setLength(0);
191 }
192 }
193 } catch (IOException e) {
194 log.warn("Error in reading from the session for device " + netconfDeviceInfo, e);
195 throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
196 netconfDeviceInfo, e));
197 //TODO should we send a socket closed message to listeners ?
198 }
199 };
200 }
201
202 private static int getMsgId(String reply) {
203 if (!reply.contains(HELLO)) {
204 String[] outer = reply.split("message-id=");
205 Preconditions.checkArgument(outer.length != 1,
206 "Error in retrieving the message id");
207 String messageID = outer[1].substring(0, 3).replace("\"", "");
208 Preconditions.checkNotNull(Integer.parseInt(messageID),
209 "Error in retrieving the message id");
210 return Integer.parseInt(messageID);
211 } else {
212 return 0;
213 }
214 }
215
216 public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
217 if (!netconfDeviceEventListeners.contains(listener)) {
218 netconfDeviceEventListeners.add(listener);
219 }
220 }
221
222 public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
223 netconfDeviceEventListeners.remove(listener);
224 }
225}