Using 1.0.0.rc2 version of Atomix
CopycatTransport updates
Change-Id: If384ac2574f098c327f0e5749766268c8d7f1ecd
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index d4f851f..4a31570 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,7 +35,6 @@
import org.slf4j.Logger;
import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
@@ -55,15 +54,13 @@
private final ScheduledExecutorService executorService;
private final PartitionId partitionId;
private final MessagingService messagingService;
- private final String protocolMessageSubject;
- private final String newConnectionMessageSubject;
+ private final String messageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
- this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
- this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
+ this.messageSubject = String.format("onos-copycat-%s", partitionId);
this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
}
@@ -71,49 +68,49 @@
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.compareAndSet(false, true)) {
- // message handler for all non-connection-establishment messages.
- messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
- try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
- long connectionId = input.readLong();
- CopycatTransportConnection connection = connections.get(connectionId);
- if (connection == null) {
- throw new IOException("Closed connection");
- }
- byte[] messagePayload = IOUtils.toByteArray(input);
- return connection.handle(messagePayload);
- } catch (IOException e) {
- return Tools.exceptionalFuture(e);
- }
- });
-
- // message handler for new connection attempts.
ThreadContext context = ThreadContext.currentContextOrThrow();
- messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
- long connectionId = Longs.fromByteArray(payload);
- CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- CopycatTransport.toAddress(sender),
- messagingService,
- getOrCreateContext(context));
- connections.put(connectionId, connection);
- connection.closeListener(c -> connections.remove(connectionId, c));
- log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
- return CompletableFuture.supplyAsync(() -> {
- listener.accept(connection);
- // echo the connectionId back to indicate successful completion.
- return payload;
- }, context.executor());
- });
- context.execute(() -> listenFuture.complete(null));
+ listen(address, listener, context);
}
return listenFuture;
}
+ private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
+ messagingService.registerHandler(messageSubject, (sender, payload) -> {
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+ long connectionId = input.readLong();
+ AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
+ CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
+ newConnectionCreated.set(true);
+ CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ CopycatTransport.toAddress(sender),
+ messagingService,
+ getOrCreateContext(context));
+ log.debug("Created new incoming connection {}", connectionId);
+ newConnection.closeListener(c -> connections.remove(connectionId, c));
+ return newConnection;
+ });
+ byte[] request = IOUtils.toByteArray(input);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ if (newConnectionCreated.get()) {
+ listener.accept(connection);
+ }
+ return connection;
+ }, context.executor()).thenCompose(c -> c.handle(request));
+ } catch (IOException e) {
+ return Tools.exceptionalFuture(e);
+ }
+ });
+ context.execute(() -> {
+ listenFuture.complete(null);
+ });
+ }
+
@Override
public CompletableFuture<Void> close() {
- messagingService.unregisterHandler(newConnectionMessageSubject);
- messagingService.unregisterHandler(protocolMessageSubject);
+ messagingService.unregisterHandler(messageSubject);
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}