BMv2 performance improvements

- Implemented a non-blocking Thrift server for the controller (before it
	was limiting the number of active connections)
- Improved configuration swap times by forcing it
- Minor bugfixes and polishing
- Update onos-bmv2 repo URL in thrift-api pom.xml

Change-Id: I13b61f5aa22558c395768e3b445f302b20c5bd33
diff --git a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
index 077a294..ed15134 100644
--- a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
+++ b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
@@ -53,9 +53,14 @@
     void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
 
     /**
-     * Returns a default context.
+     * Returns the default context.
      *
      * @return a BMv2 device context
      */
     Bmv2DeviceContext defaultContext();
+
+    /**
+     * Sets the default context for the given device.
+     */
+    void setDefaultContext(DeviceId deviceId);
 }
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
new file mode 100644
index 0000000..bf44423
--- /dev/null
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.bmv2.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A Thrift TThreadedSelectorServer that keeps track of the clients' IP address.
+ */
+final class Bmv2ControlPlaneThriftServer extends TThreadedSelectorServer {
+
+    private static final int MAX_WORKER_THREADS = 20;
+    private static final int MAX_SELECTOR_THREADS = 4;
+    private static final int ACCEPT_QUEUE_LEN = 8;
+
+    private final Map<TTransport, InetAddress> clientAddresses = Maps.newConcurrentMap();
+    private final Set<TrackingSelectorThread> selectorThreads = Sets.newHashSet();
+
+    private AcceptThread acceptThread;
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    /**
+     * Creates a new server.
+     *
+     * @param port            a listening port
+     * @param processor       a processor
+     * @param executorService an executor service
+     * @throws TTransportException
+     */
+    public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
+            throws TTransportException {
+        super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
+                      .workerThreads(MAX_WORKER_THREADS)
+                      .selectorThreads(MAX_SELECTOR_THREADS)
+                      .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
+                      .executorService(executorService)
+                      .processor(processor));
+    }
+
+    /**
+     * Returns the IP address of the client associated with the given input framed transport.
+     *
+     * @param inputTransport a framed transport instance
+     * @return the IP address of the client or null
+     */
+    InetAddress getClientAddress(TFramedTransport inputTransport) {
+        return clientAddresses.get(inputTransport);
+    }
+
+    @Override
+    protected boolean startThreads() {
+        try {
+            for (int i = 0; i < MAX_SELECTOR_THREADS; ++i) {
+                selectorThreads.add(new TrackingSelectorThread(ACCEPT_QUEUE_LEN));
+            }
+            acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+                                            createSelectorThreadLoadBalancer(selectorThreads));
+            selectorThreads.forEach(Thread::start);
+            acceptThread.start();
+            return true;
+        } catch (IOException e) {
+            log.error("Failed to start threads!", e);
+            return false;
+        }
+    }
+
+    @Override
+    protected void joinThreads() throws InterruptedException {
+        // Wait until the io threads exit.
+        acceptThread.join();
+        for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
+            thread.join();
+        }
+    }
+
+    @Override
+    public void stop() {
+        stopped_ = true;
+        // Stop queuing connect attempts asap.
+        stopListening();
+        if (acceptThread != null) {
+            acceptThread.wakeupSelector();
+        }
+        if (selectorThreads != null) {
+            selectorThreads.stream()
+                    .filter(thread -> thread != null)
+                    .forEach(TrackingSelectorThread::wakeupSelector);
+        }
+    }
+
+    private class TrackingSelectorThread extends TThreadedSelectorServer.SelectorThread {
+
+        TrackingSelectorThread(int maxPendingAccepts) throws IOException {
+            super(maxPendingAccepts);
+        }
+
+        @Override
+        protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+                                                AbstractSelectThread selectThread) {
+            TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread);
+            if (trans instanceof TNonblockingSocket) {
+                try {
+                    SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel();
+                    InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress();
+                    clientAddresses.put(frameBuffer.getInputFramedTransport(), addr);
+                } catch (IOException e) {
+                    log.warn("Exception while tracking client address", e);
+                    clientAddresses.remove(frameBuffer.getInputFramedTransport());
+                }
+            } else {
+                log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName());
+                clientAddresses.remove(frameBuffer.getInputFramedTransport());
+            }
+            return frameBuffer;
+        }
+    }
+
+    private class TrackingFrameBuffer extends FrameBuffer {
+
+        TrackingFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+                            AbstractSelectThread selectThread) {
+            super(trans, selectionKey, selectThread);
+        }
+
+        TTransport getInputFramedTransport() {
+            return this.inTrans_;
+        }
+    }
+}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
index 5409fae..4a63d21 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
@@ -34,19 +34,18 @@
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.bmv2.api.service.Bmv2Controller;
 import org.onosproject.bmv2.api.runtime.Bmv2Device;
 import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
+import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.api.service.Bmv2Controller;
 import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
 import org.onosproject.bmv2.api.service.Bmv2PacketListener;
-import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.thriftapi.BmConfig;
 import org.onosproject.bmv2.thriftapi.ControlPlaneService;
 import org.onosproject.bmv2.thriftapi.SimpleSwitch;
 import org.onosproject.bmv2.thriftapi.Standard;
@@ -55,6 +54,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
@@ -67,6 +67,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.bmv2.thriftapi.ControlPlaneService.Processor;
 
 /**
  * Default implementation of a BMv2 controller.
@@ -93,10 +94,10 @@
                     .removalListener(new ClientRemovalListener())
                     .build(new ClientLoader());
 
-    private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
+    private final TProcessor trackingProcessor = new TrackingProcessor();
 
     private final ExecutorService executorService = Executors
-            .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
+            .newFixedThreadPool(32, groupedThreads("onos/bmv2", "controller", log));
 
     private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
     private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
@@ -104,7 +105,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
-    private TThreadPoolServer thriftServer;
+    private Bmv2ControlPlaneThriftServer server;
 
     // TODO: make it configurable trough component config
     private int serverPort = DEFAULT_PORT;
@@ -124,12 +125,9 @@
 
     private void startServer(int port) {
         try {
-            TServerTransport transport = new TServerSocket(port);
             log.info("Starting server on port {}...", port);
-            this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
-                                                              .processor(trackingProcessor)
-                                                              .executorService(executorService));
-            executorService.execute(thriftServer::serve);
+            this.server = new Bmv2ControlPlaneThriftServer(port, trackingProcessor, executorService);
+            executorService.execute(server::serve);
         } catch (TTransportException e) {
             log.error("Unable to start server", e);
         }
@@ -137,8 +135,9 @@
 
     private void stopServer() {
         // Stop the server if running...
-        if (thriftServer != null && !thriftServer.isServing()) {
-            thriftServer.stop();
+        if (server != null && !server.isServing()) {
+            server.setShouldStop(true);
+            server.stop();
         }
         try {
             executorService.shutdown();
@@ -162,8 +161,11 @@
     @Override
     public boolean isReacheable(DeviceId deviceId) {
         try {
-            return getAgent(deviceId).ping();
-        } catch (Bmv2RuntimeException e) {
+            Bmv2DeviceThriftClient client = (Bmv2DeviceThriftClient) getAgent(deviceId);
+            BmConfig config = client.standardClient.bm_mgmt_get_info();
+            // The BMv2 instance running at this thrift IP and port might have a different BMv2 internal ID.
+            return config.getDevice_id() == Integer.valueOf(deviceId.uri().getFragment());
+        } catch (Bmv2RuntimeException | TException e) {
             return false;
         }
     }
@@ -213,15 +215,15 @@
     }
 
     /**
-     * Handles Thrift calls from BMv2 devices using registered listeners.
+     * Handles requests from BMv2 devices using the registered listeners.
      */
-    private final class InternalServiceHandler implements ControlPlaneService.Iface {
+    private final class ServiceHandler implements ControlPlaneService.Iface {
 
-        private final TSocket socket;
+        private final InetAddress clientAddress;
         private Bmv2Device remoteDevice;
 
-        private InternalServiceHandler(TSocket socket) {
-            this.socket = socket;
+        ServiceHandler(InetAddress clientAddress) {
+            this.clientAddress = clientAddress;
         }
 
         @Override
@@ -231,20 +233,19 @@
 
         @Override
         public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
-            // Locally note the remote device for future uses.
-            String host = socket.getSocket().getInetAddress().getHostAddress();
+            // Store a reference to the remote device for future uses.
+            String host = clientAddress.getHostAddress();
             remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
 
             if (deviceListeners.size() == 0) {
                 log.debug("Received hello, but there's no listener registered.");
             } else {
-                deviceListeners.forEach(
-                        l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
+                deviceListeners.forEach(l -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5));
             }
         }
 
         @Override
-        public void packet_in(int port, ByteBuffer packet) {
+        public void packet_in(int port, ByteBuffer data, int dataLength) {
             if (remoteDevice == null) {
                 log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
                 return;
@@ -253,41 +254,42 @@
             if (packetListeners.size() == 0) {
                 log.debug("Received packet-in, but there's no listener registered.");
             } else {
-                packetListeners.forEach(
-                        l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
-                                                                            port,
-                                                                            ImmutableByteSequence.copyFrom(packet))));
+                byte[] bytes = new byte[dataLength];
+                data.get(bytes);
+                ImmutableByteSequence pkt = ImmutableByteSequence.copyFrom(bytes);
+                packetListeners.forEach(l -> l.handlePacketIn(remoteDevice, port, pkt));
             }
         }
     }
 
     /**
-     * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
-     * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
+     * Decorator of a Thrift processor needed in order to keep track of the client's IP address that originated the
+     * request.
      */
-    private final class InternalTrackingProcessor implements TProcessor {
+    private final class TrackingProcessor implements TProcessor {
 
-        // Map sockets to processors.
-        // TODO: implement it as a cache so unused sockets are expired automatically
-        private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
-                Maps.newConcurrentMap();
+        // Map transports to processors.
+        private final ConcurrentMap<TTransport, Processor<ServiceHandler>> processors = Maps.newConcurrentMap();
 
         @Override
         public boolean process(final TProtocol in, final TProtocol out) throws TException {
-            // Get the socket for this request.
-            TSocket socket = (TSocket) in.getTransport();
-            // Get or create a processor for this socket
-            ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
-                InternalServiceHandler handler = new InternalServiceHandler(s);
-                return new ControlPlaneService.Processor<>(handler);
-            });
-            // Delegate to the processor we are decorating.
-            return processor.process(in, out);
+            // Get the client address for this request.
+            InetAddress clientAddress = server.getClientAddress((TFramedTransport) in.getTransport());
+            if (clientAddress != null) {
+                // Get or create a processor for this input transport, i.e. the client on the other side.
+                Processor<ServiceHandler> processor = processors.computeIfAbsent(
+                        in.getTransport(), t -> new Processor<>(new ServiceHandler(clientAddress)));
+                // Delegate to the processor we are decorating.
+                return processor.process(in, out);
+            } else {
+                log.warn("Unable to retrieve client IP address of incoming request");
+                return false;
+            }
         }
     }
 
     /**
-     * Transport/client cache loader.
+     * Cache loader of BMv2 Thrift clients.
      */
     private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
 
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
index c6f3d04..e077b6d 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
@@ -171,6 +171,17 @@
         return defaultContext;
     }
 
+    @Override
+    public void setDefaultContext(DeviceId deviceId) {
+        Versioned<Bmv2DeviceContext> previous = contexts.put(deviceId, defaultContext);
+        if (mastershipService.getMasterFor(deviceId) == null) {
+            // Checking for who is the master here is ugly but necessary, as this method is called by Bmv2DeviceProvider
+            // prior to master election. A solution could be to use a separate leadership contest instead of the
+            // mastership service.
+            triggerConfigCheck(deviceId, defaultContext);
+        }
+    }
+
     private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) {
         if (storedContext == null) {
             return;
@@ -206,14 +217,14 @@
     }
 
     private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) {
-        if (mastershipService.isLocalMaster(deviceId)) {
             scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS);
-        }
     }
 
     private void checkDevices() {
         deviceService.getAvailableDevices().forEach(device -> {
-            triggerConfigCheck(device.id(), getContext(device.id()));
+            if (mastershipService.isLocalMaster(device.id())) {
+                triggerConfigCheck(device.id(), getContext(device.id()));
+            }
         });
     }
 
@@ -235,8 +246,10 @@
         public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
             DeviceId deviceId = event.key();
             if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
-                log.trace("Context {} for {}", event.type().name(), deviceId);
-                triggerConfigCheck(deviceId, event.newValue().value());
+                if (mastershipService.isLocalMaster(deviceId)) {
+                    log.trace("Context {} for {}", event.type().name(), deviceId);
+                    triggerConfigCheck(deviceId, event.newValue().value());
+                }
             }
         }
     }
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
index e9e7faa..5bd5228 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
@@ -68,7 +68,7 @@
     // See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
     private static final int CONTEXT_ID = 0;
 
-    private final Standard.Iface standardClient;
+    protected final Standard.Iface standardClient;
     private final SimpleSwitch.Iface simpleSwitchClient;
     private final TTransport transport;
     private final DeviceId deviceId;
@@ -418,6 +418,7 @@
 
         try {
             standardClient.bm_swap_configs();
+            simpleSwitchClient.force_swap();
             log.debug("JSON config swapped! > deviceId={}", deviceId);
         } catch (TException e) {
             log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId);
diff --git a/protocols/bmv2/thrift-api/pom.xml b/protocols/bmv2/thrift-api/pom.xml
index 7b9bd6b..4f4e427 100644
--- a/protocols/bmv2/thrift-api/pom.xml
+++ b/protocols/bmv2/thrift-api/pom.xml
@@ -32,9 +32,9 @@
 
     <properties>
         <!-- BMv2 Commit ID and Thrift version -->
-        <bmv2.commit>024aa03e3b52f8d32c26774511e8e5b1dc11ec65</bmv2.commit>
+        <bmv2.commit>8f675d0284e9e014f1b8ed502ba54e61d68108cf</bmv2.commit>
         <bmv2.thrift.version>0.9.3</bmv2.thrift.version>
-        <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/behavioral-model/${bmv2.commit}</bmv2.baseurl>
+        <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/onos-bmv2/${bmv2.commit}</bmv2.baseurl>
         <bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace>
         <bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir>
         <thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir>