blob: 5409faeeaf2479cfbf28fb58071194697eed4bc6 [file] [log] [blame]
Carmelo Cascone17fc9e42016-05-31 11:29:21 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.bmv2.ctl;
18
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.CacheLoader;
21import com.google.common.cache.LoadingCache;
22import com.google.common.cache.RemovalListener;
23import com.google.common.cache.RemovalNotification;
24import com.google.common.collect.Maps;
25import org.apache.commons.lang3.tuple.Pair;
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.apache.thrift.TException;
33import org.apache.thrift.TProcessor;
34import org.apache.thrift.protocol.TBinaryProtocol;
35import org.apache.thrift.protocol.TMultiplexedProtocol;
36import org.apache.thrift.protocol.TProtocol;
37import org.apache.thrift.server.TThreadPoolServer;
38import org.apache.thrift.transport.TServerSocket;
39import org.apache.thrift.transport.TServerTransport;
40import org.apache.thrift.transport.TSocket;
41import org.apache.thrift.transport.TTransport;
42import org.apache.thrift.transport.TTransportException;
43import org.onlab.util.ImmutableByteSequence;
44import org.onosproject.bmv2.api.service.Bmv2Controller;
45import org.onosproject.bmv2.api.runtime.Bmv2Device;
46import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
47import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
48import org.onosproject.bmv2.api.service.Bmv2PacketListener;
49import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
50import org.onosproject.bmv2.thriftapi.ControlPlaneService;
51import org.onosproject.bmv2.thriftapi.SimpleSwitch;
52import org.onosproject.bmv2.thriftapi.Standard;
53import org.onosproject.core.CoreService;
54import org.onosproject.net.DeviceId;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.nio.ByteBuffer;
59import java.util.List;
60import java.util.Set;
61import java.util.concurrent.ConcurrentMap;
62import java.util.concurrent.CopyOnWriteArraySet;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.TimeUnit;
67
68import static com.google.common.base.Preconditions.checkNotNull;
69import static org.onlab.util.Tools.groupedThreads;
70
71/**
72 * Default implementation of a BMv2 controller.
73 */
74@Component(immediate = true)
75@Service
76public class Bmv2ControllerImpl implements Bmv2Controller {
77
78 private static final String APP_ID = "org.onosproject.bmv2";
79
80 // Seconds after a client is expired (and connection closed) in the cache.
81 private static final int CLIENT_CACHE_TIMEOUT = 60;
82 // Number of connection retries after a network error.
83 private static final int NUM_CONNECTION_RETRIES = 2;
84 // Time between retries in milliseconds.
85 private static final int TIME_BETWEEN_RETRIES = 10;
86
87 private final Logger log = LoggerFactory.getLogger(this.getClass());
88
89 // Cache where clients are removed after a predefined timeout.
90 private final LoadingCache<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> agentCache =
91 CacheBuilder.newBuilder()
92 .expireAfterAccess(CLIENT_CACHE_TIMEOUT, TimeUnit.SECONDS)
93 .removalListener(new ClientRemovalListener())
94 .build(new ClientLoader());
95
96 private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
97
98 private final ExecutorService executorService = Executors
99 .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
100
101 private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
102 private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected CoreService coreService;
106
107 private TThreadPoolServer thriftServer;
108
109 // TODO: make it configurable trough component config
110 private int serverPort = DEFAULT_PORT;
111
112 @Activate
113 public void activate() {
114 coreService.registerApplication(APP_ID);
115 startServer(serverPort);
116 log.info("Activated");
117 }
118
119 @Deactivate
120 public void deactivate() {
121 stopServer();
122 log.info("Deactivated");
123 }
124
125 private void startServer(int port) {
126 try {
127 TServerTransport transport = new TServerSocket(port);
128 log.info("Starting server on port {}...", port);
129 this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
130 .processor(trackingProcessor)
131 .executorService(executorService));
132 executorService.execute(thriftServer::serve);
133 } catch (TTransportException e) {
134 log.error("Unable to start server", e);
135 }
136 }
137
138 private void stopServer() {
139 // Stop the server if running...
140 if (thriftServer != null && !thriftServer.isServing()) {
141 thriftServer.stop();
142 }
143 try {
144 executorService.shutdown();
145 executorService.awaitTermination(5, TimeUnit.SECONDS);
146 } catch (InterruptedException e) {
147 List<Runnable> runningTasks = executorService.shutdownNow();
148 log.warn("Unable to stop server threads: {}", runningTasks);
149 }
150 }
151
152 @Override
153 public Bmv2DeviceAgent getAgent(DeviceId deviceId) throws Bmv2RuntimeException {
154 try {
155 checkNotNull(deviceId, "deviceId cannot be null");
156 return agentCache.get(deviceId).getRight();
157 } catch (ExecutionException e) {
158 throw new Bmv2RuntimeException(e);
159 }
160 }
161
162 @Override
163 public boolean isReacheable(DeviceId deviceId) {
164 try {
165 return getAgent(deviceId).ping();
166 } catch (Bmv2RuntimeException e) {
167 return false;
168 }
169 }
170
171 @Override
172 public void addDeviceListener(Bmv2DeviceListener listener) {
173 if (!deviceListeners.contains(listener)) {
174 deviceListeners.add(listener);
175 }
176 }
177
178 @Override
179 public void removeDeviceListener(Bmv2DeviceListener listener) {
180 deviceListeners.remove(listener);
181 }
182
183 @Override
184 public void addPacketListener(Bmv2PacketListener listener) {
185 if (!packetListeners.contains(listener)) {
186 packetListeners.add(listener);
187 }
188 }
189
190 @Override
191 public void removePacketListener(Bmv2PacketListener listener) {
192 packetListeners.remove(listener);
193 }
194
195 /**
196 * Client cache removal listener. Close the connection on cache removal.
197 */
198 private static class ClientRemovalListener implements
199 RemovalListener<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
200
201 @Override
202 public void onRemoval(RemovalNotification<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> notification) {
203 // close the transport connection
204 Bmv2DeviceThriftClient client = notification.getValue().getRight();
205 TTransport transport = notification.getValue().getLeft();
206 // Locking here is ugly, but needed (see SafeThriftClient).
207 synchronized (transport) {
208 if (transport.isOpen()) {
209 transport.close();
210 }
211 }
212 }
213 }
214
215 /**
216 * Handles Thrift calls from BMv2 devices using registered listeners.
217 */
218 private final class InternalServiceHandler implements ControlPlaneService.Iface {
219
220 private final TSocket socket;
221 private Bmv2Device remoteDevice;
222
223 private InternalServiceHandler(TSocket socket) {
224 this.socket = socket;
225 }
226
227 @Override
228 public boolean ping() {
229 return true;
230 }
231
232 @Override
233 public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
234 // Locally note the remote device for future uses.
235 String host = socket.getSocket().getInetAddress().getHostAddress();
236 remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
237
238 if (deviceListeners.size() == 0) {
239 log.debug("Received hello, but there's no listener registered.");
240 } else {
241 deviceListeners.forEach(
242 l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
243 }
244 }
245
246 @Override
Carmelo Cascone25f18882016-06-14 19:16:50 -0700247 public void packet_in(int port, ByteBuffer packet) {
Carmelo Cascone17fc9e42016-05-31 11:29:21 -0700248 if (remoteDevice == null) {
249 log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
250 return;
251 }
252
253 if (packetListeners.size() == 0) {
254 log.debug("Received packet-in, but there's no listener registered.");
255 } else {
256 packetListeners.forEach(
257 l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
258 port,
Carmelo Cascone17fc9e42016-05-31 11:29:21 -0700259 ImmutableByteSequence.copyFrom(packet))));
260 }
261 }
262 }
263
264 /**
265 * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
266 * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
267 */
268 private final class InternalTrackingProcessor implements TProcessor {
269
270 // Map sockets to processors.
271 // TODO: implement it as a cache so unused sockets are expired automatically
272 private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
273 Maps.newConcurrentMap();
274
275 @Override
276 public boolean process(final TProtocol in, final TProtocol out) throws TException {
277 // Get the socket for this request.
278 TSocket socket = (TSocket) in.getTransport();
279 // Get or create a processor for this socket
280 ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
281 InternalServiceHandler handler = new InternalServiceHandler(s);
282 return new ControlPlaneService.Processor<>(handler);
283 });
284 // Delegate to the processor we are decorating.
285 return processor.process(in, out);
286 }
287 }
288
289 /**
290 * Transport/client cache loader.
291 */
292 private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
293
294 private final SafeThriftClient.Options options = new SafeThriftClient.Options(NUM_CONNECTION_RETRIES,
295 TIME_BETWEEN_RETRIES);
296
297 @Override
298 public Pair<TTransport, Bmv2DeviceThriftClient> load(DeviceId deviceId)
299 throws TTransportException {
300 log.debug("Instantiating new client... > deviceId={}", deviceId);
301 // Make the expensive call
302 Bmv2Device device = Bmv2Device.of(deviceId);
303 TTransport transport = new TSocket(device.thriftServerHost(), device.thriftServerPort());
304 TProtocol protocol = new TBinaryProtocol(transport);
305 // Our BMv2 device implements multiple Thrift services, create a client for each one on the same transport.
306 Standard.Client standardClient = new Standard.Client(
307 new TMultiplexedProtocol(protocol, "standard"));
308 SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client(
309 new TMultiplexedProtocol(protocol, "simple_switch"));
310 // Wrap clients so to automatically have synchronization and resiliency to connectivity errors
311 Standard.Iface safeStandardClient = SafeThriftClient.wrap(standardClient,
312 Standard.Iface.class,
313 options);
314 SimpleSwitch.Iface safeSimpleSwitchClient = SafeThriftClient.wrap(simpleSwitch,
315 SimpleSwitch.Iface.class,
316 options);
317 Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId,
318 transport,
319 safeStandardClient,
320 safeSimpleSwitchClient);
321 return Pair.of(transport, client);
322 }
323 }
324}