blob: 80ce0c285acdfcda84228a453f45b4e034934200 [file] [log] [blame]
Carmelo Casconee4da9092016-04-26 12:14:08 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.bmv2.ctl;
18
19import com.google.common.collect.Maps;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.apache.thrift.TException;
27import org.apache.thrift.TProcessor;
28import org.apache.thrift.protocol.TProtocol;
29import org.apache.thrift.server.TThreadPoolServer;
30import org.apache.thrift.transport.TServerSocket;
31import org.apache.thrift.transport.TServerTransport;
32import org.apache.thrift.transport.TSocket;
33import org.apache.thrift.transport.TTransportException;
34import org.onlab.util.ImmutableByteSequence;
35import org.onosproject.bmv2.api.runtime.Bmv2ControlPlaneServer;
36import org.onosproject.bmv2.api.runtime.Bmv2Device;
37import org.onosproject.core.CoreService;
38import org.p4.bmv2.thrift.ControlPlaneService;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import java.nio.ByteBuffer;
43import java.util.Set;
44import java.util.concurrent.ConcurrentMap;
45import java.util.concurrent.CopyOnWriteArraySet;
46import java.util.concurrent.ExecutorService;
47import java.util.concurrent.Executors;
48import java.util.concurrent.TimeUnit;
49
50import static org.onlab.util.Tools.groupedThreads;
51import static org.p4.bmv2.thrift.ControlPlaneService.Processor;
52
53@Component(immediate = true)
54@Service
55public class Bmv2ControlPlaneThriftServer implements Bmv2ControlPlaneServer {
56
57 private static final String APP_ID = "org.onosproject.bmv2";
58 private static final Logger LOG = LoggerFactory.getLogger(Bmv2ControlPlaneThriftServer.class);
59
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 protected CoreService coreService;
62
63 private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
64 private final ExecutorService executorService = Executors
65 .newFixedThreadPool(16, groupedThreads("onos/bmv2", "control-plane-server", LOG));
66
67 private final Set<HelloListener> helloListeners = new CopyOnWriteArraySet<>();
68 private final Set<PacketListener> packetListeners = new CopyOnWriteArraySet<>();
69
70 private TThreadPoolServer thriftServer;
71 private int serverPort = DEFAULT_PORT;
72
73 @Activate
74 public void activate() {
75 coreService.registerApplication(APP_ID);
76 try {
77 TServerTransport transport = new TServerSocket(serverPort);
78 LOG.info("Starting server on port {}...", serverPort);
79 this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
80 .processor(trackingProcessor)
81 .executorService(executorService));
82 executorService.execute(thriftServer::serve);
83 } catch (TTransportException e) {
84 LOG.error("Unable to start server", e);
85 }
86 LOG.info("Activated");
87 }
88
89 @Deactivate
90 public void deactivate() {
91 // Stop the server if running...
92 if (thriftServer != null && !thriftServer.isServing()) {
93 thriftServer.stop();
94 }
95 try {
96 executorService.awaitTermination(1, TimeUnit.SECONDS);
97 } catch (InterruptedException e) {
98 LOG.error("Server threads did not terminate");
99 }
100 executorService.shutdownNow();
101 LOG.info("Deactivated");
102 }
103
104 @Override
105 public void addHelloListener(HelloListener listener) {
106 if (!helloListeners.contains(listener)) {
107 helloListeners.add(listener);
108 }
109 }
110
111 @Override
112 public void removeHelloListener(HelloListener listener) {
113 helloListeners.remove(listener);
114 }
115
116 @Override
117 public void addPacketListener(PacketListener listener) {
118 if (!packetListeners.contains(listener)) {
119 packetListeners.add(listener);
120 }
121 }
122
123 @Override
124 public void removePacketListener(PacketListener listener) {
125 packetListeners.remove(listener);
126 }
127
128 /**
129 * Handles service calls using registered listeners.
130 */
131 private final class InternalServiceHandler implements ControlPlaneService.Iface {
132
133 private final TSocket socket;
134 private Bmv2Device remoteDevice;
135
136 private InternalServiceHandler(TSocket socket) {
137 this.socket = socket;
138 }
139
140 @Override
141 public boolean ping() {
142 return true;
143 }
144
145 @Override
146 public void hello(int thriftServerPort, int deviceId) {
147 // Locally note the remote device for future uses.
148 String host = socket.getSocket().getInetAddress().getHostAddress();
149 remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
150
151 if (helloListeners.size() == 0) {
152 LOG.debug("Received hello, but there's no listener registered.");
153 } else {
154 helloListeners.forEach(listener -> listener.handleHello(remoteDevice));
155 }
156 }
157
158 @Override
159 public void packetIn(int port, long reason, int tableId, int contextId, ByteBuffer packet) {
160 if (remoteDevice == null) {
161 LOG.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
162 return;
163 }
164
165 if (packetListeners.size() == 0) {
166 LOG.debug("Received packet-in, but there's no listener registered.");
167 } else {
168 packetListeners.forEach(listener -> listener.handlePacketIn(remoteDevice,
169 port,
170 reason,
171 tableId,
172 contextId,
173 ImmutableByteSequence.copyFrom(packet)));
174 }
175 }
176 }
177
178 /**
179 * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
180 * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
181 */
182 private final class InternalTrackingProcessor implements TProcessor {
183
184 // Map sockets to processors.
185 // TODO: implement it as a cache so unused sockets are expired automatically
186 private final ConcurrentMap<TSocket, Processor<InternalServiceHandler>> processors = Maps.newConcurrentMap();
187
188 @Override
189 public boolean process(final TProtocol in, final TProtocol out) throws TException {
190 // Get the socket for this request.
191 TSocket socket = (TSocket) in.getTransport();
192 // Get or create a processor for this socket
193 Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
194 InternalServiceHandler handler = new InternalServiceHandler(s);
195 return new Processor<>(handler);
196 });
197 // Delegate to the processor we are decorating.
198 return processor.process(in, out);
199 }
200 }
201}