[ONOS-6075] Rewrite Copycat Transport
- Ensure connection IDs are globally unique
- Ensure connections are closed on each side when close() is called
- Add Transport unit tests
Change-Id: Ia848b075d4030ce74293ecc57fea983693cee265
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 913dc6c..75a75d7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -15,54 +15,92 @@
*/
package org.onosproject.store.primitives.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingService;
-
+import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
-
+import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.transport.TransportException;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
+import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
/**
- * {@link Client} implementation for {@link CopycatTransport}.
+ * Copycat transport client implementation.
*/
public class CopycatTransportClient implements Client {
-
+ private final Logger log = LoggerFactory.getLogger(getClass());
private final PartitionId partitionId;
+ private final String serverSubject;
private final MessagingService messagingService;
- private final CopycatTransport.Mode mode;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
- CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
- this.partitionId = checkNotNull(partitionId);
- this.messagingService = checkNotNull(messagingService);
- this.mode = checkNotNull(mode);
+ public CopycatTransportClient(PartitionId partitionId, MessagingService messagingService) {
+ this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
+ this.serverSubject = String.format("onos-copycat-%s", partitionId);
+ this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
}
@Override
- public CompletableFuture<Connection> connect(Address remoteAddress) {
+ public CompletableFuture<Connection> connect(Address address) {
+ CompletableFuture<Connection> future = new CompletableFuture<>();
ThreadContext context = ThreadContext.currentContextOrThrow();
- CopycatTransportConnection connection = new CopycatTransportConnection(
- nextConnectionId(),
- CopycatTransport.Mode.CLIENT,
- partitionId,
- remoteAddress,
- messagingService,
- context);
- if (mode == CopycatTransport.Mode.CLIENT) {
- connection.setBidirectional();
- }
- connection.closeListener(c -> connections.remove(c));
- connections.add(connection);
- return CompletableFuture.supplyAsync(() -> connection, context.executor());
+ Endpoint endpoint = CopycatTransport.toEndpoint(address);
+
+ log.debug("Connecting to {}", address);
+
+ ByteBuffer requestBuffer = ByteBuffer.allocate(1);
+ requestBuffer.put(CONNECT);
+
+ // Send a connect request to the server to get a unique connection ID.
+ messagingService.sendAndReceive(endpoint, serverSubject, requestBuffer.array(), context.executor())
+ .whenComplete((payload, error) -> {
+ Throwable wrappedError = error;
+ if (error != null) {
+ Throwable rootCause = Throwables.getRootCause(error);
+ if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
+ wrappedError = new TransportException(error);
+ }
+ log.warn("Connection to {} failed! Reason: {}", address, wrappedError);
+ future.completeExceptionally(wrappedError);
+ } else {
+ // If the connection is successful, the server will send back a
+ // connection ID indicating where to send messages for the connection.
+ ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
+ if (responseBuffer.get() == SUCCESS) {
+ long connectionId = responseBuffer.getLong();
+ CopycatTransportConnection connection = new CopycatTransportConnection(
+ connectionId,
+ CopycatTransportConnection.Mode.CLIENT,
+ partitionId,
+ endpoint,
+ messagingService,
+ context);
+ connection.closeListener(connections::remove);
+ connections.add(connection);
+ future.complete(connection);
+ log.debug("Created connection {}-{} to {}", partitionId, connectionId, address);
+ } else {
+ log.warn("Connection to {} failed!");
+ future.completeExceptionally(new ConnectException());
+ }
+ }
+ });
+ return future;
}
@Override
@@ -70,7 +108,11 @@
return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
}
- private long nextConnectionId() {
- return RandomUtils.nextLong();
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("partitionId", partitionId)
+ .toString();
}
- }
+}
+