diff --git a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
index 077a294..ed15134 100644
--- a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
+++ b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
@@ -53,9 +53,14 @@
     void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
 
     /**
-     * Returns a default context.
+     * Returns the default context.
      *
      * @return a BMv2 device context
      */
     Bmv2DeviceContext defaultContext();
+
+    /**
+     * Sets the default context for the given device.
+     */
+    void setDefaultContext(DeviceId deviceId);
 }
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
new file mode 100644
index 0000000..bf44423
--- /dev/null
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.bmv2.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A Thrift TThreadedSelectorServer that keeps track of the clients' IP address.
+ */
+final class Bmv2ControlPlaneThriftServer extends TThreadedSelectorServer {
+
+    private static final int MAX_WORKER_THREADS = 20;
+    private static final int MAX_SELECTOR_THREADS = 4;
+    private static final int ACCEPT_QUEUE_LEN = 8;
+
+    private final Map<TTransport, InetAddress> clientAddresses = Maps.newConcurrentMap();
+    private final Set<TrackingSelectorThread> selectorThreads = Sets.newHashSet();
+
+    private AcceptThread acceptThread;
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    /**
+     * Creates a new server.
+     *
+     * @param port            a listening port
+     * @param processor       a processor
+     * @param executorService an executor service
+     * @throws TTransportException
+     */
+    public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
+            throws TTransportException {
+        super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
+                      .workerThreads(MAX_WORKER_THREADS)
+                      .selectorThreads(MAX_SELECTOR_THREADS)
+                      .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
+                      .executorService(executorService)
+                      .processor(processor));
+    }
+
+    /**
+     * Returns the IP address of the client associated with the given input framed transport.
+     *
+     * @param inputTransport a framed transport instance
+     * @return the IP address of the client or null
+     */
+    InetAddress getClientAddress(TFramedTransport inputTransport) {
+        return clientAddresses.get(inputTransport);
+    }
+
+    @Override
+    protected boolean startThreads() {
+        try {
+            for (int i = 0; i < MAX_SELECTOR_THREADS; ++i) {
+                selectorThreads.add(new TrackingSelectorThread(ACCEPT_QUEUE_LEN));
+            }
+            acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+                                            createSelectorThreadLoadBalancer(selectorThreads));
+            selectorThreads.forEach(Thread::start);
+            acceptThread.start();
+            return true;
+        } catch (IOException e) {
+            log.error("Failed to start threads!", e);
+            return false;
+        }
+    }
+
+    @Override
+    protected void joinThreads() throws InterruptedException {
+        // Wait until the io threads exit.
+        acceptThread.join();
+        for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
+            thread.join();
+        }
+    }
+
+    @Override
+    public void stop() {
+        stopped_ = true;
+        // Stop queuing connect attempts asap.
+        stopListening();
+        if (acceptThread != null) {
+            acceptThread.wakeupSelector();
+        }
+        if (selectorThreads != null) {
+            selectorThreads.stream()
+                    .filter(thread -> thread != null)
+                    .forEach(TrackingSelectorThread::wakeupSelector);
+        }
+    }
+
+    private class TrackingSelectorThread extends TThreadedSelectorServer.SelectorThread {
+
+        TrackingSelectorThread(int maxPendingAccepts) throws IOException {
+            super(maxPendingAccepts);
+        }
+
+        @Override
+        protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+                                                AbstractSelectThread selectThread) {
+            TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread);
+            if (trans instanceof TNonblockingSocket) {
+                try {
+                    SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel();
+                    InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress();
+                    clientAddresses.put(frameBuffer.getInputFramedTransport(), addr);
+                } catch (IOException e) {
+                    log.warn("Exception while tracking client address", e);
+                    clientAddresses.remove(frameBuffer.getInputFramedTransport());
+                }
+            } else {
+                log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName());
+                clientAddresses.remove(frameBuffer.getInputFramedTransport());
+            }
+            return frameBuffer;
+        }
+    }
+
+    private class TrackingFrameBuffer extends FrameBuffer {
+
+        TrackingFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+                            AbstractSelectThread selectThread) {
+            super(trans, selectionKey, selectThread);
+        }
+
+        TTransport getInputFramedTransport() {
+            return this.inTrans_;
+        }
+    }
+}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
index 5409fae..4a63d21 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
@@ -34,19 +34,18 @@
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TMultiplexedProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.bmv2.api.service.Bmv2Controller;
 import org.onosproject.bmv2.api.runtime.Bmv2Device;
 import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
+import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.api.service.Bmv2Controller;
 import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
 import org.onosproject.bmv2.api.service.Bmv2PacketListener;
-import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.thriftapi.BmConfig;
 import org.onosproject.bmv2.thriftapi.ControlPlaneService;
 import org.onosproject.bmv2.thriftapi.SimpleSwitch;
 import org.onosproject.bmv2.thriftapi.Standard;
@@ -55,6 +54,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
@@ -67,6 +67,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.bmv2.thriftapi.ControlPlaneService.Processor;
 
 /**
  * Default implementation of a BMv2 controller.
@@ -93,10 +94,10 @@
                     .removalListener(new ClientRemovalListener())
                     .build(new ClientLoader());
 
-    private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
+    private final TProcessor trackingProcessor = new TrackingProcessor();
 
     private final ExecutorService executorService = Executors
-            .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
+            .newFixedThreadPool(32, groupedThreads("onos/bmv2", "controller", log));
 
     private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
     private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
@@ -104,7 +105,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
-    private TThreadPoolServer thriftServer;
+    private Bmv2ControlPlaneThriftServer server;
 
     // TODO: make it configurable trough component config
     private int serverPort = DEFAULT_PORT;
@@ -124,12 +125,9 @@
 
     private void startServer(int port) {
         try {
-            TServerTransport transport = new TServerSocket(port);
             log.info("Starting server on port {}...", port);
-            this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
-                                                              .processor(trackingProcessor)
-                                                              .executorService(executorService));
-            executorService.execute(thriftServer::serve);
+            this.server = new Bmv2ControlPlaneThriftServer(port, trackingProcessor, executorService);
+            executorService.execute(server::serve);
         } catch (TTransportException e) {
             log.error("Unable to start server", e);
         }
@@ -137,8 +135,9 @@
 
     private void stopServer() {
         // Stop the server if running...
-        if (thriftServer != null && !thriftServer.isServing()) {
-            thriftServer.stop();
+        if (server != null && !server.isServing()) {
+            server.setShouldStop(true);
+            server.stop();
         }
         try {
             executorService.shutdown();
@@ -162,8 +161,11 @@
     @Override
     public boolean isReacheable(DeviceId deviceId) {
         try {
-            return getAgent(deviceId).ping();
-        } catch (Bmv2RuntimeException e) {
+            Bmv2DeviceThriftClient client = (Bmv2DeviceThriftClient) getAgent(deviceId);
+            BmConfig config = client.standardClient.bm_mgmt_get_info();
+            // The BMv2 instance running at this thrift IP and port might have a different BMv2 internal ID.
+            return config.getDevice_id() == Integer.valueOf(deviceId.uri().getFragment());
+        } catch (Bmv2RuntimeException | TException e) {
             return false;
         }
     }
@@ -213,15 +215,15 @@
     }
 
     /**
-     * Handles Thrift calls from BMv2 devices using registered listeners.
+     * Handles requests from BMv2 devices using the registered listeners.
      */
-    private final class InternalServiceHandler implements ControlPlaneService.Iface {
+    private final class ServiceHandler implements ControlPlaneService.Iface {
 
-        private final TSocket socket;
+        private final InetAddress clientAddress;
         private Bmv2Device remoteDevice;
 
-        private InternalServiceHandler(TSocket socket) {
-            this.socket = socket;
+        ServiceHandler(InetAddress clientAddress) {
+            this.clientAddress = clientAddress;
         }
 
         @Override
@@ -231,20 +233,19 @@
 
         @Override
         public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
-            // Locally note the remote device for future uses.
-            String host = socket.getSocket().getInetAddress().getHostAddress();
+            // Store a reference to the remote device for future uses.
+            String host = clientAddress.getHostAddress();
             remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
 
             if (deviceListeners.size() == 0) {
                 log.debug("Received hello, but there's no listener registered.");
             } else {
-                deviceListeners.forEach(
-                        l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
+                deviceListeners.forEach(l -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5));
             }
         }
 
         @Override
-        public void packet_in(int port, ByteBuffer packet) {
+        public void packet_in(int port, ByteBuffer data, int dataLength) {
             if (remoteDevice == null) {
                 log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
                 return;
@@ -253,41 +254,42 @@
             if (packetListeners.size() == 0) {
                 log.debug("Received packet-in, but there's no listener registered.");
             } else {
-                packetListeners.forEach(
-                        l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
-                                                                            port,
-                                                                            ImmutableByteSequence.copyFrom(packet))));
+                byte[] bytes = new byte[dataLength];
+                data.get(bytes);
+                ImmutableByteSequence pkt = ImmutableByteSequence.copyFrom(bytes);
+                packetListeners.forEach(l -> l.handlePacketIn(remoteDevice, port, pkt));
             }
         }
     }
 
     /**
-     * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
-     * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
+     * Decorator of a Thrift processor needed in order to keep track of the client's IP address that originated the
+     * request.
      */
-    private final class InternalTrackingProcessor implements TProcessor {
+    private final class TrackingProcessor implements TProcessor {
 
-        // Map sockets to processors.
-        // TODO: implement it as a cache so unused sockets are expired automatically
-        private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
-                Maps.newConcurrentMap();
+        // Map transports to processors.
+        private final ConcurrentMap<TTransport, Processor<ServiceHandler>> processors = Maps.newConcurrentMap();
 
         @Override
         public boolean process(final TProtocol in, final TProtocol out) throws TException {
-            // Get the socket for this request.
-            TSocket socket = (TSocket) in.getTransport();
-            // Get or create a processor for this socket
-            ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
-                InternalServiceHandler handler = new InternalServiceHandler(s);
-                return new ControlPlaneService.Processor<>(handler);
-            });
-            // Delegate to the processor we are decorating.
-            return processor.process(in, out);
+            // Get the client address for this request.
+            InetAddress clientAddress = server.getClientAddress((TFramedTransport) in.getTransport());
+            if (clientAddress != null) {
+                // Get or create a processor for this input transport, i.e. the client on the other side.
+                Processor<ServiceHandler> processor = processors.computeIfAbsent(
+                        in.getTransport(), t -> new Processor<>(new ServiceHandler(clientAddress)));
+                // Delegate to the processor we are decorating.
+                return processor.process(in, out);
+            } else {
+                log.warn("Unable to retrieve client IP address of incoming request");
+                return false;
+            }
         }
     }
 
     /**
-     * Transport/client cache loader.
+     * Cache loader of BMv2 Thrift clients.
      */
     private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
 
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
index c6f3d04..e077b6d 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
@@ -171,6 +171,17 @@
         return defaultContext;
     }
 
+    @Override
+    public void setDefaultContext(DeviceId deviceId) {
+        Versioned<Bmv2DeviceContext> previous = contexts.put(deviceId, defaultContext);
+        if (mastershipService.getMasterFor(deviceId) == null) {
+            // Checking for who is the master here is ugly but necessary, as this method is called by Bmv2DeviceProvider
+            // prior to master election. A solution could be to use a separate leadership contest instead of the
+            // mastership service.
+            triggerConfigCheck(deviceId, defaultContext);
+        }
+    }
+
     private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) {
         if (storedContext == null) {
             return;
@@ -206,14 +217,14 @@
     }
 
     private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) {
-        if (mastershipService.isLocalMaster(deviceId)) {
             scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS);
-        }
     }
 
     private void checkDevices() {
         deviceService.getAvailableDevices().forEach(device -> {
-            triggerConfigCheck(device.id(), getContext(device.id()));
+            if (mastershipService.isLocalMaster(device.id())) {
+                triggerConfigCheck(device.id(), getContext(device.id()));
+            }
         });
     }
 
@@ -235,8 +246,10 @@
         public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
             DeviceId deviceId = event.key();
             if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
-                log.trace("Context {} for {}", event.type().name(), deviceId);
-                triggerConfigCheck(deviceId, event.newValue().value());
+                if (mastershipService.isLocalMaster(deviceId)) {
+                    log.trace("Context {} for {}", event.type().name(), deviceId);
+                    triggerConfigCheck(deviceId, event.newValue().value());
+                }
             }
         }
     }
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
index e9e7faa..5bd5228 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
@@ -68,7 +68,7 @@
     // See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
     private static final int CONTEXT_ID = 0;
 
-    private final Standard.Iface standardClient;
+    protected final Standard.Iface standardClient;
     private final SimpleSwitch.Iface simpleSwitchClient;
     private final TTransport transport;
     private final DeviceId deviceId;
@@ -418,6 +418,7 @@
 
         try {
             standardClient.bm_swap_configs();
+            simpleSwitchClient.force_swap();
             log.debug("JSON config swapped! > deviceId={}", deviceId);
         } catch (TException e) {
             log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId);
diff --git a/protocols/bmv2/thrift-api/pom.xml b/protocols/bmv2/thrift-api/pom.xml
index 7b9bd6b..4f4e427 100644
--- a/protocols/bmv2/thrift-api/pom.xml
+++ b/protocols/bmv2/thrift-api/pom.xml
@@ -32,9 +32,9 @@
 
     <properties>
         <!-- BMv2 Commit ID and Thrift version -->
-        <bmv2.commit>024aa03e3b52f8d32c26774511e8e5b1dc11ec65</bmv2.commit>
+        <bmv2.commit>8f675d0284e9e014f1b8ed502ba54e61d68108cf</bmv2.commit>
         <bmv2.thrift.version>0.9.3</bmv2.thrift.version>
-        <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/behavioral-model/${bmv2.commit}</bmv2.baseurl>
+        <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/onos-bmv2/${bmv2.commit}</bmv2.baseurl>
         <bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace>
         <bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir>
         <thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir>
diff --git a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
index fda7aad..d4ab740 100644
--- a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
+++ b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
@@ -20,10 +20,7 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.onlab.util.Timer;
+import org.onlab.util.SharedScheduledExecutors;
 import org.onosproject.bmv2.api.runtime.Bmv2Device;
 import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
 import org.onosproject.bmv2.api.service.Bmv2Controller;
@@ -34,6 +31,7 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.incubator.net.config.basics.ConfigException;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
@@ -55,14 +53,20 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.bmv2.api.runtime.Bmv2Device.*;
+import static org.onosproject.net.Device.Type.SWITCH;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
 import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.getPortStatistics;
 import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.initCounters;
@@ -76,20 +80,22 @@
 
     private static final String APP_NAME = "org.onosproject.bmv2";
 
-    private static final int POLL_INTERVAL = 5_000; // milliseconds
+    private static final int POLL_PERIOD = 5_000; // milliseconds
 
     private final Logger log = getLogger(this.getClass());
 
     private final ExecutorService executorService = Executors
             .newFixedThreadPool(16, groupedThreads("onos/bmv2", "device-discovery", log));
 
+    private final ScheduledExecutorService scheduledExecutorService = SharedScheduledExecutors.getPoolThreadExecutor();
+
     private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
 
     private final ConfigFactory cfgFactory = new InternalConfigFactory();
 
-    private final ConcurrentMap<DeviceId, DeviceDescription> activeDevices = Maps.newConcurrentMap();
+    private final Map<DeviceId, DeviceDescription> lastDescriptions = Maps.newHashMap();
 
-    private final DevicePoller devicePoller = new DevicePoller();
+    private final ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
 
     private final InternalDeviceListener deviceListener = new InternalDeviceListener();
 
@@ -103,6 +109,9 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected Bmv2Controller controller;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -112,6 +121,7 @@
     protected Bmv2TableEntryService tableEntryService;
 
     private ApplicationId appId;
+    private ScheduledFuture<?> poller;
 
     /**
      * Creates a Bmv2 device provider with the supplied identifier.
@@ -126,19 +136,24 @@
         netCfgService.registerConfigFactory(cfgFactory);
         netCfgService.addListener(cfgListener);
         controller.addDeviceListener(deviceListener);
-        devicePoller.start();
+        if (poller != null) {
+            poller.cancel(false);
+        }
+        poller = scheduledExecutorService.scheduleAtFixedRate(this::pollDevices, 1_000, POLL_PERIOD, MILLISECONDS);
         super.activate();
     }
 
     @Override
     protected void deactivate() {
-        devicePoller.stop();
+        if (poller != null) {
+            poller.cancel(false);
+        }
         controller.removeDeviceListener(deviceListener);
         try {
-            activeDevices.forEach((did, value) -> {
+            lastDescriptions.forEach((did, value) -> {
                 executorService.execute(() -> disconnectDevice(did));
             });
-            executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+            executorService.awaitTermination(1000, MILLISECONDS);
         } catch (InterruptedException e) {
             log.error("Device discovery threads did not terminate");
         }
@@ -181,32 +196,43 @@
     }
 
     private void discoverDevice(DeviceId did) {
-        log.debug("Starting device discovery... deviceId={}", did);
-        activeDevices.compute(did, (k, lastDescription) -> {
+        // Serialize discovery for the same device.
+        Lock lock = deviceLocks.computeIfAbsent(did, k -> new ReentrantLock());
+        lock.lock();
+        try {
+            log.debug("Starting device discovery... deviceId={}", did);
+
+            if (contextService.getContext(did) == null) {
+                // Device is a first timer.
+                log.info("Setting DEFAULT context for {}", did);
+                // It is important to do this before creating the device in the core
+                // so other services won't find a null context.
+                contextService.setDefaultContext(did);
+                // Abort discovery, we'll receive a new hello once the swap has been performed.
+                return;
+            }
+
+            DeviceDescription lastDescription = lastDescriptions.get(did);
             DeviceDescription thisDescription = getDeviceDescription(did);
+
             if (thisDescription != null) {
-                boolean descriptionChanged = lastDescription != null &&
-                                (!Objects.equals(thisDescription, lastDescription) ||
-                                        !Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
+                boolean descriptionChanged = lastDescription == null ||
+                        (!Objects.equals(thisDescription, lastDescription) ||
+                                !Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
                 if (descriptionChanged || !deviceService.isAvailable(did)) {
-                    if (deviceService.getDevice(did) == null) {
-                        // Device is a first timer.
-                        log.info("Setting DEFAULT context for {}", did);
-                        // It is important to do this before connecting the device so other
-                        // services won't find a null context.
-                        contextService.setContext(did, contextService.defaultContext());
-                    }
                     resetDeviceState(did);
                     initPortCounters(did);
                     providerService.deviceConnected(did, thisDescription);
                     updatePortsAndStats(did);
                 }
-                return thisDescription;
+                lastDescriptions.put(did, thisDescription);
             } else {
                 log.warn("Unable to get device description for {}", did);
-                return lastDescription;
+                lastDescriptions.put(did, lastDescription);
             }
-        });
+        } finally {
+            lock.unlock();
+        }
     }
 
     private DeviceDescription getDeviceDescription(DeviceId did) {
@@ -269,11 +295,27 @@
     }
 
     private void disconnectDevice(DeviceId did) {
-        log.debug("Trying to disconnect device from core... deviceId={}", did);
-        if (deviceService.isAvailable(did)) {
-            providerService.deviceDisconnected(did);
+        log.debug("Disconnecting device from core... deviceId={}", did);
+        providerService.deviceDisconnected(did);
+        lastDescriptions.remove(did);
+    }
+
+    private void pollDevices() {
+        for (Device device: deviceService.getAvailableDevices(SWITCH)) {
+            if (device.id().uri().getScheme().equals(SCHEME) &&
+                    mastershipService.isLocalMaster(device.id())) {
+                executorService.execute(() -> pollingTask(device.id()));
+            }
         }
-        activeDevices.remove(did);
+    }
+
+    private void pollingTask(DeviceId deviceId) {
+        log.debug("Polling device {}...", deviceId);
+        if (isReachable(deviceId)) {
+            updatePortsAndStats(deviceId);
+        } else {
+            disconnectDevice(deviceId);
+        }
     }
 
     /**
@@ -332,50 +374,4 @@
             triggerProbe(device.asDeviceId());
         }
     }
-
-    /**
-     * Task that periodically trigger device probes to check for device status and update port information.
-     */
-    private class DevicePoller implements TimerTask {
-
-        private final HashedWheelTimer timer = Timer.getTimer();
-        private Timeout timeout;
-
-        @Override
-        public void run(Timeout tout) throws Exception {
-            if (tout.isCancelled()) {
-                return;
-            }
-            activeDevices.keySet()
-                    .stream()
-                    // Filter out devices not yet created in the core.
-                    .filter(did -> deviceService.getDevice(did) != null)
-                    .forEach(did -> executorService.execute(() -> pollingTask(did)));
-            tout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.MILLISECONDS);
-        }
-
-        private void pollingTask(DeviceId deviceId) {
-            if (isReachable(deviceId)) {
-                updatePortsAndStats(deviceId);
-            } else {
-                disconnectDevice(deviceId);
-            }
-        }
-
-        /**
-         * Starts the collector.
-         */
-        synchronized void start() {
-            log.info("Starting device poller...");
-            timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
-        }
-
-        /**
-         * Stops the collector.
-         */
-        synchronized void stop() {
-            log.info("Stopping device poller...");
-            timeout.cancel();
-        }
-    }
 }
