blob: caee250d34d8f986b3cb41d857ba2cb5e2221748 [file] [log] [blame]
Carmelo Cascone17fc9e42016-05-31 11:29:21 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.bmv2.ctl;
18
19import com.google.common.cache.CacheBuilder;
20import com.google.common.cache.CacheLoader;
21import com.google.common.cache.LoadingCache;
22import com.google.common.cache.RemovalListener;
23import com.google.common.cache.RemovalNotification;
24import com.google.common.collect.Maps;
25import org.apache.commons.lang3.tuple.Pair;
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.apache.thrift.TException;
33import org.apache.thrift.TProcessor;
34import org.apache.thrift.protocol.TBinaryProtocol;
35import org.apache.thrift.protocol.TMultiplexedProtocol;
36import org.apache.thrift.protocol.TProtocol;
37import org.apache.thrift.server.TThreadPoolServer;
38import org.apache.thrift.transport.TServerSocket;
39import org.apache.thrift.transport.TServerTransport;
40import org.apache.thrift.transport.TSocket;
41import org.apache.thrift.transport.TTransport;
42import org.apache.thrift.transport.TTransportException;
43import org.onlab.util.ImmutableByteSequence;
44import org.onosproject.bmv2.api.service.Bmv2Controller;
45import org.onosproject.bmv2.api.runtime.Bmv2Device;
46import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
47import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
48import org.onosproject.bmv2.api.service.Bmv2PacketListener;
49import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
50import org.onosproject.bmv2.thriftapi.ControlPlaneService;
51import org.onosproject.bmv2.thriftapi.SimpleSwitch;
52import org.onosproject.bmv2.thriftapi.Standard;
53import org.onosproject.core.CoreService;
54import org.onosproject.net.DeviceId;
55import org.slf4j.Logger;
56import org.slf4j.LoggerFactory;
57
58import java.nio.ByteBuffer;
59import java.util.List;
60import java.util.Set;
61import java.util.concurrent.ConcurrentMap;
62import java.util.concurrent.CopyOnWriteArraySet;
63import java.util.concurrent.ExecutionException;
64import java.util.concurrent.ExecutorService;
65import java.util.concurrent.Executors;
66import java.util.concurrent.TimeUnit;
67
68import static com.google.common.base.Preconditions.checkNotNull;
69import static org.onlab.util.Tools.groupedThreads;
70
71/**
72 * Default implementation of a BMv2 controller.
73 */
74@Component(immediate = true)
75@Service
76public class Bmv2ControllerImpl implements Bmv2Controller {
77
78 private static final String APP_ID = "org.onosproject.bmv2";
79
80 // Seconds after a client is expired (and connection closed) in the cache.
81 private static final int CLIENT_CACHE_TIMEOUT = 60;
82 // Number of connection retries after a network error.
83 private static final int NUM_CONNECTION_RETRIES = 2;
84 // Time between retries in milliseconds.
85 private static final int TIME_BETWEEN_RETRIES = 10;
86
87 private final Logger log = LoggerFactory.getLogger(this.getClass());
88
89 // Cache where clients are removed after a predefined timeout.
90 private final LoadingCache<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> agentCache =
91 CacheBuilder.newBuilder()
92 .expireAfterAccess(CLIENT_CACHE_TIMEOUT, TimeUnit.SECONDS)
93 .removalListener(new ClientRemovalListener())
94 .build(new ClientLoader());
95
96 private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
97
98 private final ExecutorService executorService = Executors
99 .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
100
101 private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
102 private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected CoreService coreService;
106
107 private TThreadPoolServer thriftServer;
108
109 // TODO: make it configurable trough component config
110 private int serverPort = DEFAULT_PORT;
111
112 @Activate
113 public void activate() {
114 coreService.registerApplication(APP_ID);
115 startServer(serverPort);
116 log.info("Activated");
117 }
118
119 @Deactivate
120 public void deactivate() {
121 stopServer();
122 log.info("Deactivated");
123 }
124
125 private void startServer(int port) {
126 try {
127 TServerTransport transport = new TServerSocket(port);
128 log.info("Starting server on port {}...", port);
129 this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
130 .processor(trackingProcessor)
131 .executorService(executorService));
132 executorService.execute(thriftServer::serve);
133 } catch (TTransportException e) {
134 log.error("Unable to start server", e);
135 }
136 }
137
138 private void stopServer() {
139 // Stop the server if running...
140 if (thriftServer != null && !thriftServer.isServing()) {
141 thriftServer.stop();
142 }
143 try {
144 executorService.shutdown();
145 executorService.awaitTermination(5, TimeUnit.SECONDS);
146 } catch (InterruptedException e) {
147 List<Runnable> runningTasks = executorService.shutdownNow();
148 log.warn("Unable to stop server threads: {}", runningTasks);
149 }
150 }
151
152 @Override
153 public Bmv2DeviceAgent getAgent(DeviceId deviceId) throws Bmv2RuntimeException {
154 try {
155 checkNotNull(deviceId, "deviceId cannot be null");
156 return agentCache.get(deviceId).getRight();
157 } catch (ExecutionException e) {
158 throw new Bmv2RuntimeException(e);
159 }
160 }
161
162 @Override
163 public boolean isReacheable(DeviceId deviceId) {
164 try {
165 return getAgent(deviceId).ping();
166 } catch (Bmv2RuntimeException e) {
167 return false;
168 }
169 }
170
171 @Override
172 public void addDeviceListener(Bmv2DeviceListener listener) {
173 if (!deviceListeners.contains(listener)) {
174 deviceListeners.add(listener);
175 }
176 }
177
178 @Override
179 public void removeDeviceListener(Bmv2DeviceListener listener) {
180 deviceListeners.remove(listener);
181 }
182
183 @Override
184 public void addPacketListener(Bmv2PacketListener listener) {
185 if (!packetListeners.contains(listener)) {
186 packetListeners.add(listener);
187 }
188 }
189
190 @Override
191 public void removePacketListener(Bmv2PacketListener listener) {
192 packetListeners.remove(listener);
193 }
194
195 /**
196 * Client cache removal listener. Close the connection on cache removal.
197 */
198 private static class ClientRemovalListener implements
199 RemovalListener<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
200
201 @Override
202 public void onRemoval(RemovalNotification<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> notification) {
203 // close the transport connection
204 Bmv2DeviceThriftClient client = notification.getValue().getRight();
205 TTransport transport = notification.getValue().getLeft();
206 // Locking here is ugly, but needed (see SafeThriftClient).
207 synchronized (transport) {
208 if (transport.isOpen()) {
209 transport.close();
210 }
211 }
212 }
213 }
214
215 /**
216 * Handles Thrift calls from BMv2 devices using registered listeners.
217 */
218 private final class InternalServiceHandler implements ControlPlaneService.Iface {
219
220 private final TSocket socket;
221 private Bmv2Device remoteDevice;
222
223 private InternalServiceHandler(TSocket socket) {
224 this.socket = socket;
225 }
226
227 @Override
228 public boolean ping() {
229 return true;
230 }
231
232 @Override
233 public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
234 // Locally note the remote device for future uses.
235 String host = socket.getSocket().getInetAddress().getHostAddress();
236 remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
237
238 if (deviceListeners.size() == 0) {
239 log.debug("Received hello, but there's no listener registered.");
240 } else {
241 deviceListeners.forEach(
242 l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
243 }
244 }
245
246 @Override
247 public void packetIn(int port, long reason, int tableId, int contextId, ByteBuffer packet) {
248 if (remoteDevice == null) {
249 log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
250 return;
251 }
252
253 if (packetListeners.size() == 0) {
254 log.debug("Received packet-in, but there's no listener registered.");
255 } else {
256 packetListeners.forEach(
257 l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
258 port,
259 reason,
260 tableId,
261 contextId,
262 ImmutableByteSequence.copyFrom(packet))));
263 }
264 }
265 }
266
267 /**
268 * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
269 * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
270 */
271 private final class InternalTrackingProcessor implements TProcessor {
272
273 // Map sockets to processors.
274 // TODO: implement it as a cache so unused sockets are expired automatically
275 private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
276 Maps.newConcurrentMap();
277
278 @Override
279 public boolean process(final TProtocol in, final TProtocol out) throws TException {
280 // Get the socket for this request.
281 TSocket socket = (TSocket) in.getTransport();
282 // Get or create a processor for this socket
283 ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
284 InternalServiceHandler handler = new InternalServiceHandler(s);
285 return new ControlPlaneService.Processor<>(handler);
286 });
287 // Delegate to the processor we are decorating.
288 return processor.process(in, out);
289 }
290 }
291
292 /**
293 * Transport/client cache loader.
294 */
295 private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
296
297 private final SafeThriftClient.Options options = new SafeThriftClient.Options(NUM_CONNECTION_RETRIES,
298 TIME_BETWEEN_RETRIES);
299
300 @Override
301 public Pair<TTransport, Bmv2DeviceThriftClient> load(DeviceId deviceId)
302 throws TTransportException {
303 log.debug("Instantiating new client... > deviceId={}", deviceId);
304 // Make the expensive call
305 Bmv2Device device = Bmv2Device.of(deviceId);
306 TTransport transport = new TSocket(device.thriftServerHost(), device.thriftServerPort());
307 TProtocol protocol = new TBinaryProtocol(transport);
308 // Our BMv2 device implements multiple Thrift services, create a client for each one on the same transport.
309 Standard.Client standardClient = new Standard.Client(
310 new TMultiplexedProtocol(protocol, "standard"));
311 SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client(
312 new TMultiplexedProtocol(protocol, "simple_switch"));
313 // Wrap clients so to automatically have synchronization and resiliency to connectivity errors
314 Standard.Iface safeStandardClient = SafeThriftClient.wrap(standardClient,
315 Standard.Iface.class,
316 options);
317 SimpleSwitch.Iface safeSimpleSwitchClient = SafeThriftClient.wrap(simpleSwitch,
318 SimpleSwitch.Iface.class,
319 options);
320 Bmv2DeviceThriftClient client = new Bmv2DeviceThriftClient(deviceId,
321 transport,
322 safeStandardClient,
323 safeSimpleSwitchClient);
324 return Pair.of(transport, client);
325 }
326 }
327}