blob: 7a2fa8fc7742f4ec981336d81c491cf6933fe75e [file] [log] [blame]
Marc De Leenheer57a5af02016-12-02 20:54:41 -08001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.tl1.impl;
17
18import io.netty.bootstrap.Bootstrap;
19import io.netty.buffer.ByteBuf;
20import io.netty.buffer.Unpooled;
21import io.netty.channel.Channel;
22import io.netty.channel.ChannelFuture;
23import io.netty.channel.ChannelHandlerContext;
24import io.netty.channel.ChannelInitializer;
25import io.netty.channel.ChannelOption;
26import io.netty.channel.EventLoopGroup;
27import io.netty.channel.SimpleChannelInboundHandler;
28import io.netty.channel.nio.NioEventLoopGroup;
29import io.netty.channel.socket.SocketChannel;
30import io.netty.channel.socket.nio.NioSocketChannel;
31import io.netty.handler.codec.DelimiterBasedFrameDecoder;
32import io.netty.handler.codec.string.StringDecoder;
33import io.netty.util.CharsetUtil;
34import org.apache.commons.lang.StringUtils;
35import org.apache.felix.scr.annotations.Activate;
36import org.apache.felix.scr.annotations.Component;
37import org.apache.felix.scr.annotations.Deactivate;
38import org.apache.felix.scr.annotations.Reference;
39import org.apache.felix.scr.annotations.ReferenceCardinality;
40import org.apache.felix.scr.annotations.Service;
41import org.onlab.util.Tools;
42import org.onosproject.mastership.MastershipService;
43import org.onosproject.net.DeviceId;
44import org.onosproject.tl1.Tl1Command;
45import org.onosproject.tl1.Tl1Controller;
46import org.onosproject.tl1.Tl1Device;
47import org.onosproject.tl1.Tl1Listener;
48import org.slf4j.Logger;
49import org.slf4j.LoggerFactory;
50
51import java.nio.charset.Charset;
52import java.util.Arrays;
53import java.util.Collection;
54import java.util.Map;
55import java.util.Optional;
56import java.util.Set;
57import java.util.concurrent.CompletableFuture;
58import java.util.concurrent.ConcurrentHashMap;
59import java.util.concurrent.ConcurrentMap;
60import java.util.concurrent.CopyOnWriteArraySet;
61import java.util.concurrent.ExecutorService;
62import java.util.concurrent.Executors;
63import java.util.stream.Collectors;
64
65/**
66 * Implementation of TL1 controller.
67 *
68 * Handles the connection and input/output for all registered TL1 devices.
69 * Turn on debug logging if you want to see all message I/O.
70 *
71 * Per device, we track commands using a simple ctag-keyed map. This assumes the client is sending out unique ctag's.
72 */
73@Component(immediate = true)
74@Service
75public class DefaultTl1Controller implements Tl1Controller {
76 private final Logger log = LoggerFactory.getLogger(DefaultTl1Controller.class);
77
78 // TL1 message delimiter (semi colon)
79 private static final ByteBuf DELIMITER = Unpooled.copiedBuffer(new char[]{';'}, Charset.defaultCharset());
80 private static final String COMPLD = "COMPLD";
81 private static final String DENY = "DENY";
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected MastershipService mastershipService;
85
86 private ConcurrentMap<DeviceId, Tl1Device> deviceMap = new ConcurrentHashMap<>();
87 // Key: channel, value: map with key ctag, value: future TL1 msg (ctags are assumed unique per device)
88 private ConcurrentMap<Channel, ConcurrentMap<Integer, CompletableFuture<String>>> msgMap =
89 new ConcurrentHashMap<>();
90 private EventLoopGroup workerGroup = new NioEventLoopGroup();
91 private Set<Tl1Listener> tl1Listeners = new CopyOnWriteArraySet<>();
92 private ExecutorService executor;
93
94 @Activate
95 public void activate() {
96 executor = Executors.newFixedThreadPool(
97 Runtime.getRuntime().availableProcessors(),
98 Tools.groupedThreads("onos/tl1controller", "%d", log));
99 log.info("Started");
100 }
101
102 @Deactivate
103 public void deactivate() {
104 executor.shutdown();
105 deviceMap.clear();
106 msgMap.clear();
107 log.info("Stopped");
108 }
109
110 @Override
111 /**
112 * This implementation returns an empty string on failure.
113 */
114 public CompletableFuture<String> sendMsg(DeviceId deviceId, Tl1Command msg) {
115 log.debug("Sending TL1 message to device {}: {}", deviceId, msg);
116
117 Tl1Device device = deviceMap.get(deviceId);
118 if (device == null || !device.isConnected() || !mastershipService.isLocalMaster(deviceId)) {
119 return CompletableFuture.completedFuture(StringUtils.EMPTY);
120 }
121
122 // Create and store completable future, complete it in the channel handler when we receive a response
123 CompletableFuture<String> future = new CompletableFuture<>();
124 Channel channel = device.channel();
125 if (!msgMap.containsKey(channel)) {
126 return CompletableFuture.completedFuture(StringUtils.EMPTY);
127 }
128 msgMap.get(channel).put(msg.ctag(), future);
129
130 // Write message to channel
131 channel.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
132
133 return future;
134 }
135
136 @Override
137 public Optional<Tl1Device> getDevice(DeviceId deviceId) {
138 return Optional.ofNullable(deviceMap.get(deviceId));
139 }
140
141 @Override
142 public boolean addDevice(DeviceId deviceId, Tl1Device device) {
143 log.debug("Adding TL1 device {} {}", deviceId);
144
145 // Ignore if device already known
146 if (deviceMap.containsKey(deviceId)) {
147 log.error("Ignoring duplicate device {}", deviceId);
148 return false;
149 }
150
151 deviceMap.put(deviceId, device);
152 return true;
153 }
154
155 @Override
156 public void connectDevice(DeviceId deviceId) {
157 Tl1Device device = deviceMap.get(deviceId);
158 if (device == null || device.isConnected()) {
159 return;
160 }
161
162 Bootstrap b = new Bootstrap();
163 b.group(workerGroup)
164 .channel(NioSocketChannel.class)
165 .option(ChannelOption.SO_KEEPALIVE, true)
166 .handler(new ChannelInitializer<SocketChannel>() {
167 @Override
168 protected void initChannel(SocketChannel socketChannel) throws Exception {
169 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, DELIMITER));
170 socketChannel.pipeline().addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
171 // TODO
172 //socketChannel.pipeline().addLast(new Tl1Decoder());
173 socketChannel.pipeline().addLast(new Tl1InboundHandler());
174 }
175 })
176 .remoteAddress(device.ip().toInetAddress(), device.port())
177 .connect()
178 .addListener((ChannelFuture channelFuture) -> {
179 if (channelFuture.isSuccess()) {
180 msgMap.put(channelFuture.channel(), new ConcurrentHashMap<>());
181 device.connect(channelFuture.channel());
182 tl1Listeners.forEach(l -> executor.execute(() -> l.deviceConnected(deviceId)));
183 }
184 });
185 }
186
187 @Override
188 public void removeDevice(DeviceId deviceId) {
189 disconnectDevice(deviceId);
190 deviceMap.remove(deviceId);
191 }
192
193 @Override
194 public void addListener(Tl1Listener listener) {
195 tl1Listeners.add(listener);
196 }
197
198 @Override
199 public void removeListener(Tl1Listener listener) {
200 tl1Listeners.remove(listener);
201 }
202
203 @Override
204 public void disconnectDevice(DeviceId deviceId) {
205 // Ignore if unknown device
206 Tl1Device device = deviceMap.get(deviceId);
207 if (device == null) {
208 return;
209 }
210
211 Channel channel = device.channel();
212 if (channel != null) {
213 channel.close();
214 }
215
216 msgMap.remove(channel);
217 device.disconnect();
218 tl1Listeners.forEach(l -> l.deviceDisconnected(deviceId));
219 }
220
221 @Override
222 public Set<DeviceId> getDeviceIds() {
223 return deviceMap.keySet();
224 }
225
226 @Override
227 public Collection<Tl1Device> getDevices() {
228 return deviceMap.values();
229 }
230
231 /**
232 * Crude filtering handler that will only complete our stored future upon receiving a TL1 response messages.
233 */
234 private class Tl1InboundHandler extends SimpleChannelInboundHandler<String> {
235 @Override
236 protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
237 log.debug("Received TL1 message {}", s);
238
239 // Search for "COMPLD" or "DENY" to identify a TL1 response,
240 // then return the remainder of the string.
241 String[] words = s.split("\\s");
242 for (int i = 0; i < words.length; i++) {
243 String w = words[i];
244 if (w.startsWith(COMPLD) || w.startsWith(DENY)) {
245 // ctag is just in front of it
246 int ctag = Integer.parseInt(words[i - 1]);
247 // We return everything that follows to the caller (this will lose line breaks and such)
248 String result = Arrays.stream(words).skip(i + 1).collect(Collectors.joining());
249 // Set future when command is executed, good or bad
250 Map<Integer, CompletableFuture<String>> msg = msgMap.get(ctx.channel());
251 if (msg != null) {
252 CompletableFuture<String> f = msg.remove(ctag);
253 if (f != null) {
254 f.complete(result);
255 }
256 }
257
258 return;
259 }
260 }
261 }
262 }
263}