Copycat transport enhancements
Change-Id: I50e9eb0f419b2aa10deff6d54f58649688788faa
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 6acb7db..03a5a71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -16,13 +16,11 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,8 +30,8 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
-import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
@@ -47,6 +45,7 @@
*/
public class CopycatTransportServer implements Server {
+ private final Logger log = getLogger(getClass());
private final AtomicBoolean listening = new AtomicBoolean(false);
private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private final PartitionId partitionId;
@@ -73,28 +72,23 @@
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
- AtomicBoolean newConnection = new AtomicBoolean(false);
+ AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
- newConnection.set(true);
- try {
- InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
- int senderPort = sender.port();
- Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
- return new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- senderAddress,
- messagingService,
- getOrCreateContext(context));
- } catch (UnknownHostException e) {
- Throwables.propagate(e);
- return null;
- }
+ newConnectionCreated.set(true);
+ CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ CopycatTransport.toAddress(sender),
+ messagingService,
+ getOrCreateContext(context));
+ log.debug("Created new incoming connection {}", connectionId);
+ newConnection.closeListener(c -> connections.remove(connectionId, c));
+ return newConnection;
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
() -> {
- if (newConnection.get()) {
+ if (newConnectionCreated.get()) {
listener.accept(connection);
}
return connection;