BMv2 performance improvements
- Implemented a non-blocking Thrift server for the controller (before it
was limiting the number of active connections)
- Improved configuration swap times by forcing it
- Minor bugfixes and polishing
- Update onos-bmv2 repo URL in thrift-api pom.xml
Change-Id: I13b61f5aa22558c395768e3b445f302b20c5bd33
diff --git a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
index 077a294..ed15134 100644
--- a/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
+++ b/protocols/bmv2/api/src/main/java/org/onosproject/bmv2/api/service/Bmv2DeviceContextService.java
@@ -53,9 +53,14 @@
void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
/**
- * Returns a default context.
+ * Returns the default context.
*
* @return a BMv2 device context
*/
Bmv2DeviceContext defaultContext();
+
+ /**
+ * Sets the default context for the given device.
+ */
+ void setDefaultContext(DeviceId deviceId);
}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
new file mode 100644
index 0000000..bf44423
--- /dev/null
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControlPlaneThriftServer.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.bmv2.ctl;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A Thrift TThreadedSelectorServer that keeps track of the clients' IP address.
+ */
+final class Bmv2ControlPlaneThriftServer extends TThreadedSelectorServer {
+
+ private static final int MAX_WORKER_THREADS = 20;
+ private static final int MAX_SELECTOR_THREADS = 4;
+ private static final int ACCEPT_QUEUE_LEN = 8;
+
+ private final Map<TTransport, InetAddress> clientAddresses = Maps.newConcurrentMap();
+ private final Set<TrackingSelectorThread> selectorThreads = Sets.newHashSet();
+
+ private AcceptThread acceptThread;
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * Creates a new server.
+ *
+ * @param port a listening port
+ * @param processor a processor
+ * @param executorService an executor service
+ * @throws TTransportException
+ */
+ public Bmv2ControlPlaneThriftServer(int port, TProcessor processor, ExecutorService executorService)
+ throws TTransportException {
+ super(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
+ .workerThreads(MAX_WORKER_THREADS)
+ .selectorThreads(MAX_SELECTOR_THREADS)
+ .acceptQueueSizePerThread(ACCEPT_QUEUE_LEN)
+ .executorService(executorService)
+ .processor(processor));
+ }
+
+ /**
+ * Returns the IP address of the client associated with the given input framed transport.
+ *
+ * @param inputTransport a framed transport instance
+ * @return the IP address of the client or null
+ */
+ InetAddress getClientAddress(TFramedTransport inputTransport) {
+ return clientAddresses.get(inputTransport);
+ }
+
+ @Override
+ protected boolean startThreads() {
+ try {
+ for (int i = 0; i < MAX_SELECTOR_THREADS; ++i) {
+ selectorThreads.add(new TrackingSelectorThread(ACCEPT_QUEUE_LEN));
+ }
+ acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+ createSelectorThreadLoadBalancer(selectorThreads));
+ selectorThreads.forEach(Thread::start);
+ acceptThread.start();
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to start threads!", e);
+ return false;
+ }
+ }
+
+ @Override
+ protected void joinThreads() throws InterruptedException {
+ // Wait until the io threads exit.
+ acceptThread.join();
+ for (TThreadedSelectorServer.SelectorThread thread : selectorThreads) {
+ thread.join();
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped_ = true;
+ // Stop queuing connect attempts asap.
+ stopListening();
+ if (acceptThread != null) {
+ acceptThread.wakeupSelector();
+ }
+ if (selectorThreads != null) {
+ selectorThreads.stream()
+ .filter(thread -> thread != null)
+ .forEach(TrackingSelectorThread::wakeupSelector);
+ }
+ }
+
+ private class TrackingSelectorThread extends TThreadedSelectorServer.SelectorThread {
+
+ TrackingSelectorThread(int maxPendingAccepts) throws IOException {
+ super(maxPendingAccepts);
+ }
+
+ @Override
+ protected FrameBuffer createFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+ AbstractSelectThread selectThread) {
+ TrackingFrameBuffer frameBuffer = new TrackingFrameBuffer(trans, selectionKey, selectThread);
+ if (trans instanceof TNonblockingSocket) {
+ try {
+ SocketChannel socketChannel = ((TNonblockingSocket) trans).getSocketChannel();
+ InetAddress addr = ((InetSocketAddress) socketChannel.getRemoteAddress()).getAddress();
+ clientAddresses.put(frameBuffer.getInputFramedTransport(), addr);
+ } catch (IOException e) {
+ log.warn("Exception while tracking client address", e);
+ clientAddresses.remove(frameBuffer.getInputFramedTransport());
+ }
+ } else {
+ log.warn("Unknown TNonblockingTransport instance: {}", trans.getClass().getName());
+ clientAddresses.remove(frameBuffer.getInputFramedTransport());
+ }
+ return frameBuffer;
+ }
+ }
+
+ private class TrackingFrameBuffer extends FrameBuffer {
+
+ TrackingFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey,
+ AbstractSelectThread selectThread) {
+ super(trans, selectionKey, selectThread);
+ }
+
+ TTransport getInputFramedTransport() {
+ return this.inTrans_;
+ }
+ }
+}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
index 5409fae..4a63d21 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2ControllerImpl.java
@@ -34,19 +34,18 @@
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.onlab.util.ImmutableByteSequence;
-import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
+import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.service.Bmv2DeviceListener;
import org.onosproject.bmv2.api.service.Bmv2PacketListener;
-import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
+import org.onosproject.bmv2.thriftapi.BmConfig;
import org.onosproject.bmv2.thriftapi.ControlPlaneService;
import org.onosproject.bmv2.thriftapi.SimpleSwitch;
import org.onosproject.bmv2.thriftapi.Standard;
@@ -55,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
@@ -67,6 +67,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.bmv2.thriftapi.ControlPlaneService.Processor;
/**
* Default implementation of a BMv2 controller.
@@ -93,10 +94,10 @@
.removalListener(new ClientRemovalListener())
.build(new ClientLoader());
- private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
+ private final TProcessor trackingProcessor = new TrackingProcessor();
private final ExecutorService executorService = Executors
- .newFixedThreadPool(16, groupedThreads("onos/bmv2", "controller", log));
+ .newFixedThreadPool(32, groupedThreads("onos/bmv2", "controller", log));
private final Set<Bmv2DeviceListener> deviceListeners = new CopyOnWriteArraySet<>();
private final Set<Bmv2PacketListener> packetListeners = new CopyOnWriteArraySet<>();
@@ -104,7 +105,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
- private TThreadPoolServer thriftServer;
+ private Bmv2ControlPlaneThriftServer server;
// TODO: make it configurable trough component config
private int serverPort = DEFAULT_PORT;
@@ -124,12 +125,9 @@
private void startServer(int port) {
try {
- TServerTransport transport = new TServerSocket(port);
log.info("Starting server on port {}...", port);
- this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
- .processor(trackingProcessor)
- .executorService(executorService));
- executorService.execute(thriftServer::serve);
+ this.server = new Bmv2ControlPlaneThriftServer(port, trackingProcessor, executorService);
+ executorService.execute(server::serve);
} catch (TTransportException e) {
log.error("Unable to start server", e);
}
@@ -137,8 +135,9 @@
private void stopServer() {
// Stop the server if running...
- if (thriftServer != null && !thriftServer.isServing()) {
- thriftServer.stop();
+ if (server != null && !server.isServing()) {
+ server.setShouldStop(true);
+ server.stop();
}
try {
executorService.shutdown();
@@ -162,8 +161,11 @@
@Override
public boolean isReacheable(DeviceId deviceId) {
try {
- return getAgent(deviceId).ping();
- } catch (Bmv2RuntimeException e) {
+ Bmv2DeviceThriftClient client = (Bmv2DeviceThriftClient) getAgent(deviceId);
+ BmConfig config = client.standardClient.bm_mgmt_get_info();
+ // The BMv2 instance running at this thrift IP and port might have a different BMv2 internal ID.
+ return config.getDevice_id() == Integer.valueOf(deviceId.uri().getFragment());
+ } catch (Bmv2RuntimeException | TException e) {
return false;
}
}
@@ -213,15 +215,15 @@
}
/**
- * Handles Thrift calls from BMv2 devices using registered listeners.
+ * Handles requests from BMv2 devices using the registered listeners.
*/
- private final class InternalServiceHandler implements ControlPlaneService.Iface {
+ private final class ServiceHandler implements ControlPlaneService.Iface {
- private final TSocket socket;
+ private final InetAddress clientAddress;
private Bmv2Device remoteDevice;
- private InternalServiceHandler(TSocket socket) {
- this.socket = socket;
+ ServiceHandler(InetAddress clientAddress) {
+ this.clientAddress = clientAddress;
}
@Override
@@ -231,20 +233,19 @@
@Override
public void hello(int thriftServerPort, int deviceId, int instanceId, String jsonConfigMd5) {
- // Locally note the remote device for future uses.
- String host = socket.getSocket().getInetAddress().getHostAddress();
+ // Store a reference to the remote device for future uses.
+ String host = clientAddress.getHostAddress();
remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
if (deviceListeners.size() == 0) {
log.debug("Received hello, but there's no listener registered.");
} else {
- deviceListeners.forEach(
- l -> executorService.execute(() -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5)));
+ deviceListeners.forEach(l -> l.handleHello(remoteDevice, instanceId, jsonConfigMd5));
}
}
@Override
- public void packet_in(int port, ByteBuffer packet) {
+ public void packet_in(int port, ByteBuffer data, int dataLength) {
if (remoteDevice == null) {
log.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
return;
@@ -253,41 +254,42 @@
if (packetListeners.size() == 0) {
log.debug("Received packet-in, but there's no listener registered.");
} else {
- packetListeners.forEach(
- l -> executorService.execute(() -> l.handlePacketIn(remoteDevice,
- port,
- ImmutableByteSequence.copyFrom(packet))));
+ byte[] bytes = new byte[dataLength];
+ data.get(bytes);
+ ImmutableByteSequence pkt = ImmutableByteSequence.copyFrom(bytes);
+ packetListeners.forEach(l -> l.handlePacketIn(remoteDevice, port, pkt));
}
}
}
/**
- * Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
- * Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
+ * Decorator of a Thrift processor needed in order to keep track of the client's IP address that originated the
+ * request.
*/
- private final class InternalTrackingProcessor implements TProcessor {
+ private final class TrackingProcessor implements TProcessor {
- // Map sockets to processors.
- // TODO: implement it as a cache so unused sockets are expired automatically
- private final ConcurrentMap<TSocket, ControlPlaneService.Processor<InternalServiceHandler>> processors =
- Maps.newConcurrentMap();
+ // Map transports to processors.
+ private final ConcurrentMap<TTransport, Processor<ServiceHandler>> processors = Maps.newConcurrentMap();
@Override
public boolean process(final TProtocol in, final TProtocol out) throws TException {
- // Get the socket for this request.
- TSocket socket = (TSocket) in.getTransport();
- // Get or create a processor for this socket
- ControlPlaneService.Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
- InternalServiceHandler handler = new InternalServiceHandler(s);
- return new ControlPlaneService.Processor<>(handler);
- });
- // Delegate to the processor we are decorating.
- return processor.process(in, out);
+ // Get the client address for this request.
+ InetAddress clientAddress = server.getClientAddress((TFramedTransport) in.getTransport());
+ if (clientAddress != null) {
+ // Get or create a processor for this input transport, i.e. the client on the other side.
+ Processor<ServiceHandler> processor = processors.computeIfAbsent(
+ in.getTransport(), t -> new Processor<>(new ServiceHandler(clientAddress)));
+ // Delegate to the processor we are decorating.
+ return processor.process(in, out);
+ } else {
+ log.warn("Unable to retrieve client IP address of incoming request");
+ return false;
+ }
}
}
/**
- * Transport/client cache loader.
+ * Cache loader of BMv2 Thrift clients.
*/
private class ClientLoader extends CacheLoader<DeviceId, Pair<TTransport, Bmv2DeviceThriftClient>> {
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
index c6f3d04..e077b6d 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceContextServiceImpl.java
@@ -171,6 +171,17 @@
return defaultContext;
}
+ @Override
+ public void setDefaultContext(DeviceId deviceId) {
+ Versioned<Bmv2DeviceContext> previous = contexts.put(deviceId, defaultContext);
+ if (mastershipService.getMasterFor(deviceId) == null) {
+ // Checking for who is the master here is ugly but necessary, as this method is called by Bmv2DeviceProvider
+ // prior to master election. A solution could be to use a separate leadership contest instead of the
+ // mastership service.
+ triggerConfigCheck(deviceId, defaultContext);
+ }
+ }
+
private void configCheck(DeviceId deviceId, Bmv2DeviceContext storedContext) {
if (storedContext == null) {
return;
@@ -206,14 +217,14 @@
}
private void triggerConfigCheck(DeviceId deviceId, Bmv2DeviceContext context) {
- if (mastershipService.isLocalMaster(deviceId)) {
scheduledExecutor.schedule(() -> configCheck(deviceId, context), 0, TimeUnit.SECONDS);
- }
}
private void checkDevices() {
deviceService.getAvailableDevices().forEach(device -> {
- triggerConfigCheck(device.id(), getContext(device.id()));
+ if (mastershipService.isLocalMaster(device.id())) {
+ triggerConfigCheck(device.id(), getContext(device.id()));
+ }
});
}
@@ -235,8 +246,10 @@
public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
DeviceId deviceId = event.key();
if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
- log.trace("Context {} for {}", event.type().name(), deviceId);
- triggerConfigCheck(deviceId, event.newValue().value());
+ if (mastershipService.isLocalMaster(deviceId)) {
+ log.trace("Context {} for {}", event.type().name(), deviceId);
+ triggerConfigCheck(deviceId, event.newValue().value());
+ }
}
}
}
diff --git a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
index e9e7faa..5bd5228 100644
--- a/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
+++ b/protocols/bmv2/ctl/src/main/java/org/onosproject/bmv2/ctl/Bmv2DeviceThriftClient.java
@@ -68,7 +68,7 @@
// See: https://github.com/p4lang/behavioral-model/blob/master/modules/bm_sim/include/bm_sim/context.h
private static final int CONTEXT_ID = 0;
- private final Standard.Iface standardClient;
+ protected final Standard.Iface standardClient;
private final SimpleSwitch.Iface simpleSwitchClient;
private final TTransport transport;
private final DeviceId deviceId;
@@ -418,6 +418,7 @@
try {
standardClient.bm_swap_configs();
+ simpleSwitchClient.force_swap();
log.debug("JSON config swapped! > deviceId={}", deviceId);
} catch (TException e) {
log.debug("Exception while swapping JSON config: {} > deviceId={}", e, deviceId);
diff --git a/protocols/bmv2/thrift-api/pom.xml b/protocols/bmv2/thrift-api/pom.xml
index 7b9bd6b..4f4e427 100644
--- a/protocols/bmv2/thrift-api/pom.xml
+++ b/protocols/bmv2/thrift-api/pom.xml
@@ -32,9 +32,9 @@
<properties>
<!-- BMv2 Commit ID and Thrift version -->
- <bmv2.commit>024aa03e3b52f8d32c26774511e8e5b1dc11ec65</bmv2.commit>
+ <bmv2.commit>8f675d0284e9e014f1b8ed502ba54e61d68108cf</bmv2.commit>
<bmv2.thrift.version>0.9.3</bmv2.thrift.version>
- <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/behavioral-model/${bmv2.commit}</bmv2.baseurl>
+ <bmv2.baseurl>https://cdn.rawgit.com/opennetworkinglab/onos-bmv2/${bmv2.commit}</bmv2.baseurl>
<bmv2.thrift.javanamespace>org.onosproject.bmv2.thriftapi</bmv2.thrift.javanamespace>
<bmv2.thrift.srcdir>${project.build.directory}/thrift-sources/${bmv2.commit}/</bmv2.thrift.srcdir>
<thrift.exedir>${project.build.directory}/thrift-compiler/</thrift.exedir>
diff --git a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
index fda7aad..d4ab740 100644
--- a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
+++ b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
@@ -20,10 +20,7 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.onlab.util.Timer;
+import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
import org.onosproject.bmv2.api.service.Bmv2Controller;
@@ -34,6 +31,7 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.config.basics.ConfigException;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
@@ -55,14 +53,20 @@
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.bmv2.api.runtime.Bmv2Device.*;
+import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.getPortStatistics;
import static org.onosproject.provider.bmv2.device.impl.Bmv2PortStatisticsGetter.initCounters;
@@ -76,20 +80,22 @@
private static final String APP_NAME = "org.onosproject.bmv2";
- private static final int POLL_INTERVAL = 5_000; // milliseconds
+ private static final int POLL_PERIOD = 5_000; // milliseconds
private final Logger log = getLogger(this.getClass());
private final ExecutorService executorService = Executors
.newFixedThreadPool(16, groupedThreads("onos/bmv2", "device-discovery", log));
+ private final ScheduledExecutorService scheduledExecutorService = SharedScheduledExecutors.getPoolThreadExecutor();
+
private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
private final ConfigFactory cfgFactory = new InternalConfigFactory();
- private final ConcurrentMap<DeviceId, DeviceDescription> activeDevices = Maps.newConcurrentMap();
+ private final Map<DeviceId, DeviceDescription> lastDescriptions = Maps.newHashMap();
- private final DevicePoller devicePoller = new DevicePoller();
+ private final ConcurrentMap<DeviceId, Lock> deviceLocks = Maps.newConcurrentMap();
private final InternalDeviceListener deviceListener = new InternalDeviceListener();
@@ -103,6 +109,9 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected Bmv2Controller controller;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -112,6 +121,7 @@
protected Bmv2TableEntryService tableEntryService;
private ApplicationId appId;
+ private ScheduledFuture<?> poller;
/**
* Creates a Bmv2 device provider with the supplied identifier.
@@ -126,19 +136,24 @@
netCfgService.registerConfigFactory(cfgFactory);
netCfgService.addListener(cfgListener);
controller.addDeviceListener(deviceListener);
- devicePoller.start();
+ if (poller != null) {
+ poller.cancel(false);
+ }
+ poller = scheduledExecutorService.scheduleAtFixedRate(this::pollDevices, 1_000, POLL_PERIOD, MILLISECONDS);
super.activate();
}
@Override
protected void deactivate() {
- devicePoller.stop();
+ if (poller != null) {
+ poller.cancel(false);
+ }
controller.removeDeviceListener(deviceListener);
try {
- activeDevices.forEach((did, value) -> {
+ lastDescriptions.forEach((did, value) -> {
executorService.execute(() -> disconnectDevice(did));
});
- executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ executorService.awaitTermination(1000, MILLISECONDS);
} catch (InterruptedException e) {
log.error("Device discovery threads did not terminate");
}
@@ -181,32 +196,43 @@
}
private void discoverDevice(DeviceId did) {
- log.debug("Starting device discovery... deviceId={}", did);
- activeDevices.compute(did, (k, lastDescription) -> {
+ // Serialize discovery for the same device.
+ Lock lock = deviceLocks.computeIfAbsent(did, k -> new ReentrantLock());
+ lock.lock();
+ try {
+ log.debug("Starting device discovery... deviceId={}", did);
+
+ if (contextService.getContext(did) == null) {
+ // Device is a first timer.
+ log.info("Setting DEFAULT context for {}", did);
+ // It is important to do this before creating the device in the core
+ // so other services won't find a null context.
+ contextService.setDefaultContext(did);
+ // Abort discovery, we'll receive a new hello once the swap has been performed.
+ return;
+ }
+
+ DeviceDescription lastDescription = lastDescriptions.get(did);
DeviceDescription thisDescription = getDeviceDescription(did);
+
if (thisDescription != null) {
- boolean descriptionChanged = lastDescription != null &&
- (!Objects.equals(thisDescription, lastDescription) ||
- !Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
+ boolean descriptionChanged = lastDescription == null ||
+ (!Objects.equals(thisDescription, lastDescription) ||
+ !Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
if (descriptionChanged || !deviceService.isAvailable(did)) {
- if (deviceService.getDevice(did) == null) {
- // Device is a first timer.
- log.info("Setting DEFAULT context for {}", did);
- // It is important to do this before connecting the device so other
- // services won't find a null context.
- contextService.setContext(did, contextService.defaultContext());
- }
resetDeviceState(did);
initPortCounters(did);
providerService.deviceConnected(did, thisDescription);
updatePortsAndStats(did);
}
- return thisDescription;
+ lastDescriptions.put(did, thisDescription);
} else {
log.warn("Unable to get device description for {}", did);
- return lastDescription;
+ lastDescriptions.put(did, lastDescription);
}
- });
+ } finally {
+ lock.unlock();
+ }
}
private DeviceDescription getDeviceDescription(DeviceId did) {
@@ -269,11 +295,27 @@
}
private void disconnectDevice(DeviceId did) {
- log.debug("Trying to disconnect device from core... deviceId={}", did);
- if (deviceService.isAvailable(did)) {
- providerService.deviceDisconnected(did);
+ log.debug("Disconnecting device from core... deviceId={}", did);
+ providerService.deviceDisconnected(did);
+ lastDescriptions.remove(did);
+ }
+
+ private void pollDevices() {
+ for (Device device: deviceService.getAvailableDevices(SWITCH)) {
+ if (device.id().uri().getScheme().equals(SCHEME) &&
+ mastershipService.isLocalMaster(device.id())) {
+ executorService.execute(() -> pollingTask(device.id()));
+ }
}
- activeDevices.remove(did);
+ }
+
+ private void pollingTask(DeviceId deviceId) {
+ log.debug("Polling device {}...", deviceId);
+ if (isReachable(deviceId)) {
+ updatePortsAndStats(deviceId);
+ } else {
+ disconnectDevice(deviceId);
+ }
}
/**
@@ -332,50 +374,4 @@
triggerProbe(device.asDeviceId());
}
}
-
- /**
- * Task that periodically trigger device probes to check for device status and update port information.
- */
- private class DevicePoller implements TimerTask {
-
- private final HashedWheelTimer timer = Timer.getTimer();
- private Timeout timeout;
-
- @Override
- public void run(Timeout tout) throws Exception {
- if (tout.isCancelled()) {
- return;
- }
- activeDevices.keySet()
- .stream()
- // Filter out devices not yet created in the core.
- .filter(did -> deviceService.getDevice(did) != null)
- .forEach(did -> executorService.execute(() -> pollingTask(did)));
- tout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.MILLISECONDS);
- }
-
- private void pollingTask(DeviceId deviceId) {
- if (isReachable(deviceId)) {
- updatePortsAndStats(deviceId);
- } else {
- disconnectDevice(deviceId);
- }
- }
-
- /**
- * Starts the collector.
- */
- synchronized void start() {
- log.info("Starting device poller...");
- timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
- }
-
- /**
- * Stops the collector.
- */
- synchronized void stop() {
- log.info("Stopping device poller...");
- timeout.cancel();
- }
- }
}