BMv2 performance improvements
- Implemented a non-blocking Thrift server for the controller (before it
was limiting the number of active connections)
- Improved configuration swap times by forcing it
- Minor bugfixes and polishing
- Update onos-bmv2 repo URL in thrift-api pom.xml
Change-Id: I13b61f5aa22558c395768e3b445f302b20c5bd33
diff --git a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
index 077a294..ed15134 100644
--- a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
+++ b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
@@ -53,9 +53,14 @@
void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
/**
- * Returns a default context.
+ * Returns the default context.
*
* @return a BMv2 device context
*/
Bmv2DeviceContext defaultContext();
+
+ /**
+ * Sets the default context for the given device.
+ */
+ void setDefaultContext(DeviceId deviceId);
}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
new file mode 100644
index 0000000..bf44423
--- /dev/null
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.bmv2.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A Thrift TThreadedSelectorServer that keeps track of the clients' IP address.
+ */
+final class Bmv2ControlPlaneThriftServer extends TThreadedSelectorServer {
+
+ private static final int MAX_WORKER_THREADS = 20;
+ private static final int MAX_SELECTOR_THREADS = 4;
+ private static final int ACCEPT_QUEUE_LEN = 8;
+
+ private final Map<TTransport, InetAddress> clientAddresses = Maps.newConcurrentMap();
+ private final Set<TrackingSelectorThread> selectorThreads = Sets.newHashSet();
+
+ private AcceptThread acceptThread;
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * Creates a new server.
+ *
+ * @param port a listening port
+ * @param processor a processor
+ * @param executorService an executor service
+ * @throws TTransportException
+ */
+ public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
+ throws TTransportException {
+ super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
+ .workerThreads(MAX_WORKER_THREADS)
+ .selectorThreads(MAX_SELECTOR_THREADS)
+ .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
+ .executorService(executorService)
+ .processor(processor));
+ }
+
+ /**
+ * Returns the IP address of the client associated with the given input framed transport.
+ *
+ * @param inputTransport a framed transport instance
+ * @return the IP address of the client or null
+ */
+ InetAddress getClientAddress(TFramedTransport inputTransport) {
+ return clientAddresses.get(inputTransport);
+ }
+
+ @Override
+ protected boolean startThreads() {
+ try {
+ for (int i = 0; i < MAX_SELECTOR_THREADS; ++i) {
+ selectorThreads.add(new TrackingSelectorThread(ACCEPT_QUEUE_LEN));
+ }
+ acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+ createSelectorThreadLoadBalancer(selectorThreads));
+ selectorThreads.forEach(Thread::start);
+ acceptThread.start();
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to start threads!", e);
+ return false;
+ }
+ }
+
+ @Override
+ protected void joinThreads() throws InterruptedException {
+ // Wait until the io threads exit.
+ acceptThread.join();
+ for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
+ thread.join();
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped_ = true;
+ // Stop queuing connect attempts asap.
+ stopListening();
+ if (acceptThread != null) {
+ acceptThread.wakeupSelector();
+ }
+ if (selectorThreads != null) {
+ selectorThreads.stream()
+ .filter(thread -> thread != null)
+ .forEach(TrackingSelectorThread::wakeupSelector);
+ }
+ }
+
+ private class TrackingSelectorThread extends TThreadedSelectorServer.SelectorThread {
+
+ TrackingSelectorThread(int maxPendingAccepts) throws IOException {
+ super(maxPendingAccepts);
+ }
+
+ @Override
+ protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+ AbstractSelectThread selectThread) {
+ TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread);
+ if (trans instanceof TNonblockingSocket) {
+ try {
+ SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel();
+ InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress();
+ clientAddresses.put(frameBuffer.getInputFramedTransport(), addr);
+ } catch (IOException e) {
+ log.warn("Exception while tracking client address", e);
+ clientAddresses.remove(frameBuffer.getInputFramedTransport());
+ }
+ } else {
+ log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName());
+ clientAddresses.remove(frameBuffer.getInputFramedTransport());
+ }
+ return frameBuffer;
+ }
+ }
+
+ private class TrackingFrameBuffer extends FrameBuffer {
+
+ TrackingFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+ AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ }
+
+ TTransport getInputFramedTransport() {
+ return this.inTrans_;
+ }
+ }
+}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
index 5409fae..4a63d21 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
@@ -34,19 +34,18 @@
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
+import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
import org.onosproject.bmv2.api.service.Bmv2PacketListener;
-import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.thriftapi.BmConfig;
import org.onosproject.bmv2.thriftapi.ControlPlaneService;
import org.onosproject.bmv2.thriftapi.SimpleSwitch;
import org.onosproject.bmv2.thriftapi.Standard;
@@ -55,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
@@ -67,6 +67,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.bmv2.thriftapi.ControlPlaneService.Processor;
/**
* Default implementation of a BMv2 controller.
@@ -93,10 +94,10 @@
.removalListener(new ClientRemovalListener())
.build(new ClientLoader());
- private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
+ private final TProcessor trackingProcessor = new TrackingProcessor();
private final ExecutorService executorService = Executors
- .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
+ .newFixedThreadPool(32, groupedThreads("onos/bmv2", "controller", log));
private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
@@ -104,7 +105,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
- private TThreadPoolServer thriftServer;
+ private Bmv2ControlPlaneThriftServer server;
// TODO: make it configurable trough component config
private int serverPort = DEFAULT_PORT;
@@ -124,12 +125,9 @@
private void startServer(int port) {
try {
- TServerTransport transport = new TServerSocket(port);
log.info("Starting server on port {}...", port);
- this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
- .processor(trackingProcessor)
- .executorService(executorService));
- executorService.execute(thriftServer::serve);
+ this.server = new Bmv2ControlPlaneThriftServer(port, trackingProcessor, executorService);
+ executorService.execute(server::serve);
} catch (TTransportException e) {
log.error("Unable to start server", e);
}
@@ -137,8 +135,9 @@
private void stopServer() {
// Stop the server if running...
- if (thriftServer != null && !thriftServer.isServing()) {
- thriftServer.stop();
+ if (server != null && !server.isServing()) {
+ server.setShouldStop(true);
+ server.stop();
}
try {
executorService.shutdown();
@@ -162,8 +161,11 @@
@Override
public boolean isReacheable(DeviceId deviceId) {
try {
- return getAgent(deviceId).ping();
- } catch (Bmv2RuntimeException e) {
+ Bmv2DeviceThriftClient client = (Bmv2DeviceThriftClient) getAgent(deviceId);
+ BmConfig config = client.standardClient.bm_mgmt_get_info();
+ // The BMv2 instance running at this thrift IP and port might have a different BMv2 internal ID.
+ return config.getDevice_id() == Integer.valueOf(deviceId.uri().getFragment());
+ } catch (Bmv2RuntimeException | TException e) {
return false;
}
}
@@ -213,15 +215,15 @@
}
/**
- * Handles Thrift calls from BMv2 devices using registered listeners.
+ * Handles requests from BMv2 devices using the registered listeners.
*/
- private final class InternalServiceHandler implements ControlPlaneService.Iface {
+ private final class ServiceHandler implements ControlPlaneService.Iface {
- private final TSocket socket;
+ private final InetAddress clientAddress;
private Bmv2Device remoteDevice;
- private InternalServiceHandler(TSocket socket) {
- this.socket = socket;
+ ServiceHandler(InetAddress clientAddress) {
+ this.clientAddress = clientAddress;
}
@Override
@@ -231,20 +233,19 @@
@Override
public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
- // Locally note the remote device for future uses.
- String host = socket.getSocket().getInetAddress().getHostAddress();
+ // Store a reference to the remote device for future uses.
+ String host = clientAddress.getHostAddress();
remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
if (deviceListeners.size() == 0) {
log.debug("Received hello, but there's no listener registered.");
} else {
- deviceListeners.forEach(
- l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
+ deviceListeners.forEach(l -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5));
}
}
@Override
- public void packet_in(int port, ByteBuffer packet) {
+ public void packet_in(int port, ByteBuffer data, int dataLength) {
if (remoteDevice == null) {
log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
return;
@@ -253,41 +254,42 @@
if (packetListeners.size() == 0) {
log.debug("Received packet-in, but there's no listener registered.");
} else {
- packetListeners.forEach(
- l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
- port,
- ImmutableByteSequence.copyFrom(packet))));
+ byte[] bytes = new byte[dataLength];
+ data.get(bytes);
+ ImmutableByteSequence pkt = ImmutableByteSequence.copyFrom(bytes);
+ packetListeners.forEach(l -> l.handlePacketIn(remoteDevice, port, pkt));
}
}
}
/**
- * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
- * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
+ * Decorator of a Thrift processor needed in order to keep track of the client's IP address that originated the
+ * request.
*/
- private final class InternalTrackingProcessor implements TProcessor {
+ private final class TrackingProcessor implements TProcessor {
- // Map sockets to processors.
- // TODO: implement it as a cache so unused sockets are expired automatically
- private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
- Maps.newConcurrentMap();
+ // Map transports to processors.
+ private final ConcurrentMap<TTransport, Processor<ServiceHandler>> processors = Maps.newConcurrentMap();
@Override
public boolean process(final TProtocol in, final TProtocol out) throws TException {
- // Get the socket for this request.
- TSocket socket = (TSocket) in.getTransport();
- // Get or create a processor for this socket
- ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
- InternalServiceHandler handler = new InternalServiceHandler(s);
- return new ControlPlaneService.Processor<>(handler);
- });
- // Delegate to the processor we are decorating.
- return processor.process(in, out);
+ // Get the client address for this request.
+ InetAddress clientAddress = server.getClientAddress((TFramedTransport) in.getTransport());
+ if (clientAddress != null) {
+ // Get or create a processor for this input transport, i.e. the client on the other side.
+ Processor<ServiceHandler> processor = processors.computeIfAbsent(
+ in.getTransport(), t -> new Processor<>(new ServiceHandler(clientAddress)));
+ // Delegate to the processor we are decorating.
+ return processor.process(in, out);
+ } else {
+ log.warn("Unable to retrieve client IP address of incoming request");
+ return false;
+ }
}
}
/**
- * Transport/client cache loader.
+ * Cache loader of BMv2 Thrift clients.
*/
private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
index c6f3d04..e077b6d 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
@@ -171,6 +171,17 @@
return defaultContext;
}
+ @Override
+ public void setDefaultContext(DeviceId deviceId) {
+ Versioned<Bmv2DeviceContext> previous = contexts.put(deviceId, defaultContext);
+ if (mastershipService.getMasterFor(deviceId) == null) {
+ // Checking for who is the master here is ugly but necessary, as this method is called by Bmv2DeviceProvider
+ // prior to master election. A solution could be to use a separate leadership contest instead of the
+ // mastership service.
+ triggerConfigCheck(deviceId, defaultContext);
+ }
+ }
+
private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) {
if (storedContext == null) {
return;
@@ -206,14 +217,14 @@
}
private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) {
- if (mastershipService.isLocalMaster(deviceId)) {
scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS);
- }
}
private void checkDevices() {
deviceService.getAvailableDevices().forEach(device -> {
- triggerConfigCheck(device.id(), getContext(device.id()));
+ if (mastershipService.isLocalMaster(device.id())) {
+ triggerConfigCheck(device.id(), getContext(device.id()));
+ }
});
}
@@ -235,8 +246,10 @@
public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
DeviceId deviceId = event.key();
if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
- log.trace("Context {} for {}", event.type().name(), deviceId);
- triggerConfigCheck(deviceId, event.newValue().value());
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.trace("Context {} for {}", event.type().name(), deviceId);
+ triggerConfigCheck(deviceId, event.newValue().value());
+ }
}
}
}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
index e9e7faa..5bd5228 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
@@ -68,7 +68,7 @@
// See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
private static final int CONTEXT_ID = 0;
- private final Standard.Iface standardClient;
+ protected final Standard.Iface standardClient;
private final SimpleSwitch.Iface simpleSwitchClient;
private final TTransport transport;
private final DeviceId deviceId;
@@ -418,6 +418,7 @@
try {
standardClient.bm_swap_configs();
+ simpleSwitchClient.force_swap();
log.debug("JSON config swapped! > deviceId={}", deviceId);
} catch (TException e) {
log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId);
diff --git a/protocols/bmv2/thrift-api/pom.xml b/protocols/bmv2/thrift-api/pom.xml
index 7b9bd6b..4f4e427 100644
--- a/protocols/bmv2/thrift-api/pom.xml
+++ b/protocols/bmv2/thrift-api/pom.xml
@@ -32,9 +32,9 @@
<properties>
<!-- BMv2 Commit ID and Thrift version -->
- <bmv2.commit>024aa03e3b52f8d32c26774511e8e5b1dc11ec65</bmv2.commit>
+ <bmv2.commit>8f675d0284e9e014f1b8ed502ba54e61d68108cf</bmv2.commit>
<bmv2.thrift.version>0.9.3</bmv2.thrift.version>
- <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/behavioral-model/${bmv2.commit}</bmv2.baseurl>
+ <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/onos-bmv2/${bmv2.commit}</bmv2.baseurl>
<bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace>
<bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir>
<thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir>