blob: 6f0ccfec298562b0e32c6f8583196eedad620f9c [file] [log] [blame]
Marc De Leenheer57a5af02016-12-02 20:54:41 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Marc De Leenheer57a5af02016-12-02 20:54:41 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.tl1.impl;
17
18import io.netty.bootstrap.Bootstrap;
19import io.netty.buffer.ByteBuf;
20import io.netty.buffer.Unpooled;
21import io.netty.channel.Channel;
22import io.netty.channel.ChannelFuture;
23import io.netty.channel.ChannelHandlerContext;
24import io.netty.channel.ChannelInitializer;
25import io.netty.channel.ChannelOption;
26import io.netty.channel.EventLoopGroup;
27import io.netty.channel.SimpleChannelInboundHandler;
28import io.netty.channel.nio.NioEventLoopGroup;
29import io.netty.channel.socket.SocketChannel;
30import io.netty.channel.socket.nio.NioSocketChannel;
31import io.netty.handler.codec.DelimiterBasedFrameDecoder;
32import io.netty.handler.codec.string.StringDecoder;
33import io.netty.util.CharsetUtil;
34import org.apache.commons.lang.StringUtils;
Marc De Leenheer57a5af02016-12-02 20:54:41 -080035import org.onlab.util.Tools;
36import org.onosproject.mastership.MastershipService;
37import org.onosproject.net.DeviceId;
38import org.onosproject.tl1.Tl1Command;
39import org.onosproject.tl1.Tl1Controller;
40import org.onosproject.tl1.Tl1Device;
41import org.onosproject.tl1.Tl1Listener;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
Marc De Leenheer57a5af02016-12-02 20:54:41 -080047import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
49
50import java.nio.charset.Charset;
51import java.util.Arrays;
52import java.util.Collection;
53import java.util.Map;
54import java.util.Optional;
55import java.util.Set;
56import java.util.concurrent.CompletableFuture;
57import java.util.concurrent.ConcurrentHashMap;
58import java.util.concurrent.ConcurrentMap;
59import java.util.concurrent.CopyOnWriteArraySet;
60import java.util.concurrent.ExecutorService;
61import java.util.concurrent.Executors;
62import java.util.stream.Collectors;
63
64/**
65 * Implementation of TL1 controller.
66 *
67 * Handles the connection and input/output for all registered TL1 devices.
68 * Turn on debug logging if you want to see all message I/O.
69 *
70 * Per device, we track commands using a simple ctag-keyed map. This assumes the client is sending out unique ctag's.
71 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072@Component(immediate = true, service = Tl1Controller.class)
Marc De Leenheer57a5af02016-12-02 20:54:41 -080073public class DefaultTl1Controller implements Tl1Controller {
74 private final Logger log = LoggerFactory.getLogger(DefaultTl1Controller.class);
75
76 // TL1 message delimiter (semi colon)
77 private static final ByteBuf DELIMITER = Unpooled.copiedBuffer(new char[]{';'}, Charset.defaultCharset());
78 private static final String COMPLD = "COMPLD";
79 private static final String DENY = "DENY";
80
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Marc De Leenheer57a5af02016-12-02 20:54:41 -080082 protected MastershipService mastershipService;
83
84 private ConcurrentMap<DeviceId, Tl1Device> deviceMap = new ConcurrentHashMap<>();
85 // Key: channel, value: map with key ctag, value: future TL1 msg (ctags are assumed unique per device)
86 private ConcurrentMap<Channel, ConcurrentMap<Integer, CompletableFuture<String>>> msgMap =
87 new ConcurrentHashMap<>();
88 private EventLoopGroup workerGroup = new NioEventLoopGroup();
89 private Set<Tl1Listener> tl1Listeners = new CopyOnWriteArraySet<>();
90 private ExecutorService executor;
91
92 @Activate
93 public void activate() {
94 executor = Executors.newFixedThreadPool(
95 Runtime.getRuntime().availableProcessors(),
96 Tools.groupedThreads("onos/tl1controller", "%d", log));
97 log.info("Started");
98 }
99
100 @Deactivate
101 public void deactivate() {
102 executor.shutdown();
103 deviceMap.clear();
104 msgMap.clear();
105 log.info("Stopped");
106 }
107
108 @Override
109 /**
110 * This implementation returns an empty string on failure.
111 */
112 public CompletableFuture<String> sendMsg(DeviceId deviceId, Tl1Command msg) {
113 log.debug("Sending TL1 message to device {}: {}", deviceId, msg);
114
115 Tl1Device device = deviceMap.get(deviceId);
116 if (device == null || !device.isConnected() || !mastershipService.isLocalMaster(deviceId)) {
117 return CompletableFuture.completedFuture(StringUtils.EMPTY);
118 }
119
120 // Create and store completable future, complete it in the channel handler when we receive a response
121 CompletableFuture<String> future = new CompletableFuture<>();
122 Channel channel = device.channel();
123 if (!msgMap.containsKey(channel)) {
124 return CompletableFuture.completedFuture(StringUtils.EMPTY);
125 }
126 msgMap.get(channel).put(msg.ctag(), future);
127
128 // Write message to channel
129 channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
130
131 return future;
132 }
133
134 @Override
135 public Optional<Tl1Device> getDevice(DeviceId deviceId) {
136 return Optional.ofNullable(deviceMap.get(deviceId));
137 }
138
139 @Override
140 public boolean addDevice(DeviceId deviceId, Tl1Device device) {
141 log.debug("Adding TL1 device {} {}", deviceId);
142
143 // Ignore if device already known
144 if (deviceMap.containsKey(deviceId)) {
145 log.error("Ignoring duplicate device {}", deviceId);
146 return false;
147 }
148
149 deviceMap.put(deviceId, device);
150 return true;
151 }
152
153 @Override
154 public void connectDevice(DeviceId deviceId) {
155 Tl1Device device = deviceMap.get(deviceId);
156 if (device == null || device.isConnected()) {
157 return;
158 }
159
160 Bootstrap b = new Bootstrap();
161 b.group(workerGroup)
162 .channel(NioSocketChannel.class)
163 .option(ChannelOption.SO_KEEPALIVE, true)
164 .handler(new ChannelInitializer<SocketChannel>() {
165 @Override
166 protected void initChannel(SocketChannel socketChannel) throws Exception {
167 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, DELIMITER));
168 socketChannel.pipeline().addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
169 // TODO
170 //socketChannel.pipeline().addLast(new Tl1Decoder());
171 socketChannel.pipeline().addLast(new Tl1InboundHandler());
172 }
173 })
174 .remoteAddress(device.ip().toInetAddress(), device.port())
175 .connect()
176 .addListener((ChannelFuture channelFuture) -> {
177 if (channelFuture.isSuccess()) {
178 msgMap.put(channelFuture.channel(), new ConcurrentHashMap<>());
179 device.connect(channelFuture.channel());
180 tl1Listeners.forEach(l -> executor.execute(() -> l.deviceConnected(deviceId)));
181 }
182 });
183 }
184
185 @Override
186 public void removeDevice(DeviceId deviceId) {
187 disconnectDevice(deviceId);
188 deviceMap.remove(deviceId);
189 }
190
191 @Override
192 public void addListener(Tl1Listener listener) {
193 tl1Listeners.add(listener);
194 }
195
196 @Override
197 public void removeListener(Tl1Listener listener) {
198 tl1Listeners.remove(listener);
199 }
200
201 @Override
202 public void disconnectDevice(DeviceId deviceId) {
203 // Ignore if unknown device
204 Tl1Device device = deviceMap.get(deviceId);
205 if (device == null) {
206 return;
207 }
208
209 Channel channel = device.channel();
210 if (channel != null) {
211 channel.close();
rohitsharan1afdb3f2017-04-17 21:16:13 +0530212 msgMap.remove(channel);
Marc De Leenheer57a5af02016-12-02 20:54:41 -0800213 }
214
Marc De Leenheer57a5af02016-12-02 20:54:41 -0800215 device.disconnect();
216 tl1Listeners.forEach(l -> l.deviceDisconnected(deviceId));
217 }
218
219 @Override
220 public Set<DeviceId> getDeviceIds() {
221 return deviceMap.keySet();
222 }
223
224 @Override
225 public Collection<Tl1Device> getDevices() {
226 return deviceMap.values();
227 }
228
229 /**
230 * Crude filtering handler that will only complete our stored future upon receiving a TL1 response messages.
231 */
232 private class Tl1InboundHandler extends SimpleChannelInboundHandler<String> {
233 @Override
234 protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
235 log.debug("Received TL1 message {}", s);
236
237 // Search for "COMPLD" or "DENY" to identify a TL1 response,
238 // then return the remainder of the string.
239 String[] words = s.split("\\s");
240 for (int i = 0; i < words.length; i++) {
241 String w = words[i];
242 if (w.startsWith(COMPLD) || w.startsWith(DENY)) {
243 // ctag is just in front of it
244 int ctag = Integer.parseInt(words[i - 1]);
245 // We return everything that follows to the caller (this will lose line breaks and such)
Ray Milkey3717e602018-02-01 13:49:47 -0800246 String result = Arrays.stream(words).skip(i + 1L).collect(Collectors.joining());
Marc De Leenheer57a5af02016-12-02 20:54:41 -0800247 // Set future when command is executed, good or bad
248 Map<Integer, CompletableFuture<String>> msg = msgMap.get(ctx.channel());
249 if (msg != null) {
250 CompletableFuture<String> f = msg.remove(ctag);
251 if (f != null) {
252 f.complete(result);
253 }
254 }
255
256 return;
257 }
258 }
259 }
260 }
261}