Copycat transport enhancements
Change-Id: I50e9eb0f419b2aa10deff6d54f58649688788faa
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 2cecc0f..53f57f1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -15,29 +15,19 @@
*/
package org.onosproject.store.primitives.impl;
-import io.atomix.catalyst.serializer.CatalystSerializable;
+import java.util.Arrays;
+
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.copycat.client.Query;
-import io.atomix.manager.state.GetResource;
-import io.atomix.manager.state.GetResourceKeys;
-import io.atomix.resource.ResourceQuery;
-import io.atomix.variables.state.ValueCommands;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.Scanner;
import org.onlab.util.Match;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
@@ -47,7 +37,6 @@
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
/**
@@ -65,113 +54,31 @@
org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
MapEntryUpdateResult.class,
MapEntryUpdateResult.Status.class,
- MapUpdate.class,
- MapUpdate.Type.class,
- MapTransaction.class,
Transaction.State.class,
- TransactionId.class,
PrepareResult.class,
CommitResult.class,
RollbackResult.class,
- AtomixConsistentMapCommands.Get.class,
- AtomixConsistentMapCommands.ContainsKey.class,
- AtomixConsistentMapCommands.ContainsValue.class,
- AtomixConsistentMapCommands.Size.class,
- AtomixConsistentMapCommands.IsEmpty.class,
- AtomixConsistentMapCommands.KeySet.class,
- AtomixConsistentMapCommands.EntrySet.class,
- AtomixConsistentMapCommands.Values.class,
- AtomixConsistentMapCommands.UpdateAndGet.class,
- AtomixConsistentMapCommands.TransactionPrepare.class,
- AtomixConsistentMapCommands.TransactionCommit.class,
- AtomixConsistentMapCommands.TransactionRollback.class,
- AtomixLeaderElectorCommands.GetLeadership.class,
- AtomixLeaderElectorCommands.GetAllLeaderships.class,
- AtomixLeaderElectorCommands.GetElectedTopics.class,
- AtomixLeaderElectorCommands.Run.class,
- AtomixLeaderElectorCommands.Withdraw.class,
- AtomixLeaderElectorCommands.Anoint.class,
- GetResource.class,
- GetResourceKeys.class,
- ResourceQuery.class,
- ValueCommands.Get.class,
- ValueCommands.Set.class,
Query.ConsistencyLevel.class));
// ONOS classes
serializer.register(Change.class, factory);
+ serializer.register(Leader.class, factory);
+ serializer.register(Leadership.class, factory);
serializer.register(NodeId.class, factory);
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
- serializer.register(MapTransaction.class, factory);
serializer.register(Transaction.State.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
serializer.register(RollbackResult.class, factory);
serializer.register(TransactionId.class, factory);
serializer.register(MapUpdate.class, factory);
+ serializer.register(MapUpdate.Type.class, factory);
+ serializer.register(MapTransaction.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
- serializer.register(AtomixConsistentMapState.class, factory);
- serializer.register(ResourceQuery.class, factory);
- serializer.register(GetResource.class, factory);
- serializer.register(GetResourceKeys.class, factory);
- serializer.register(ValueCommands.Get.class, factory);
- serializer.register(ValueCommands.Set.class, factory);
-
- // ConsistentMap
- serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
- serializer.register(AtomixConsistentMapCommands.Clear.class);
- serializer.register(AtomixConsistentMapCommands.Listen.class);
- serializer.register(AtomixConsistentMapCommands.Unlisten.class);
- serializer.register(AtomixConsistentMapCommands.Get.class);
- serializer.register(AtomixConsistentMapCommands.ContainsKey.class);
- serializer.register(AtomixConsistentMapCommands.ContainsValue.class);
- serializer.register(AtomixConsistentMapCommands.EntrySet.class);
- serializer.register(AtomixConsistentMapCommands.IsEmpty.class);
- serializer.register(AtomixConsistentMapCommands.KeySet.class);
- serializer.register(AtomixConsistentMapCommands.Size.class);
- serializer.register(AtomixConsistentMapCommands.Values.class);
- serializer.register(AtomixConsistentMapCommands.TransactionPrepare.class);
- serializer.register(AtomixConsistentMapCommands.TransactionCommit.class);
- serializer.register(AtomixConsistentMapCommands.TransactionRollback.class);
- // LeaderElector
- serializer.register(AtomixLeaderElectorCommands.Run.class, factory);
- serializer.register(AtomixLeaderElectorCommands.Withdraw.class, factory);
- serializer.register(AtomixLeaderElectorCommands.Anoint.class, factory);
- serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
- serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
- serializer.register(AtomixLeaderElectorCommands.GetLeadership.class, factory);
- serializer.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, factory);
- serializer.register(AtomixLeaderElectorCommands.Listen.class);
- serializer.register(AtomixLeaderElectorCommands.Unlisten.class);
- // Atomix types
- try {
- ClassLoader cl = CatalystSerializable.class.getClassLoader();
- Enumeration<URL> urls = cl.getResources(
- String.format("META-INF/services/%s", CatalystSerializable.class.getName()));
- while (urls.hasMoreElements()) {
- URL url = urls.nextElement();
- try (Scanner scanner = new Scanner(url.openStream(), "UTF-8")) {
- scanner.useDelimiter("\n").forEachRemaining(line -> {
- if (!line.trim().startsWith("#")) {
- line = line.trim();
- if (line.length() > 0) {
- try {
- serializer.register(cl.loadClass(line));
- } catch (ClassNotFoundException e) {
- Throwables.propagate(e);
- }
- }
- }
- });
- }
- }
- } catch (IOException e) {
- Throwables.propagate(e);
- }
return serializer;
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
index ddec252..b5b1f21 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -17,9 +17,20 @@
import static com.google.common.base.Preconditions.checkNotNull;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.onlab.packet.IpAddress;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.transport.Transport;
@@ -51,6 +62,8 @@
private final Mode mode;
private final PartitionId partitionId;
private final MessagingService messagingService;
+ private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
+ private static final Map<Endpoint, Address> ADDRESS_LOOKUP_CACHE = Maps.newConcurrentMap();
public CopycatTransport(Mode mode, PartitionId partitionId, MessagingService messagingService) {
this.mode = checkNotNull(mode);
@@ -70,4 +83,42 @@
return new CopycatTransportServer(partitionId,
messagingService);
}
+
+ /**
+ * Maps {@link Address address} to {@link Endpoint endpoint}.
+ * @param address
+ * @return end point
+ */
+ public static Endpoint toEndpoint(Address address) {
+ return EP_LOOKUP_CACHE.computeIfAbsent(address, a -> {
+ try {
+ Endpoint endpoint = new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
+ ADDRESS_LOOKUP_CACHE.putIfAbsent(endpoint, address);
+ return endpoint;
+ } catch (UnknownHostException e) {
+ Throwables.propagate(e);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Maps {@link Endpoint endpoint} to {@link Address address}.
+ * @param endpoint end point
+ * @return address
+ */
+ public static Address toAddress(Endpoint endpoint) {
+ return ADDRESS_LOOKUP_CACHE.computeIfAbsent(endpoint, ep -> {
+ try {
+ InetAddress host = InetAddress.getByAddress(endpoint.host().toOctets());
+ int port = endpoint.port();
+ Address address = new Address(new InetSocketAddress(host, port));
+ EP_LOOKUP_CACHE.putIfAbsent(address, endpoint);
+ return address;
+ } catch (UnknownHostException e) {
+ Throwables.propagate(e);
+ return null;
+ }
+ });
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 96729c7..9a1dce9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -50,18 +50,23 @@
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
- CopycatTransportConnection connection = new CopycatTransportConnection(
- nextConnectionId(),
- CopycatTransport.Mode.CLIENT,
- partitionId,
- remoteAddress,
- messagingService,
- context);
- if (mode == CopycatTransport.Mode.CLIENT) {
- connection.setBidirectional();
- }
- connections.add(connection);
- return CompletableFuture.supplyAsync(() -> connection, context.executor());
+ return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
+ PartitionManager.HELLO_MESSAGE_SUBJECT,
+ "hello".getBytes())
+ .thenApplyAsync(r -> {
+ CopycatTransportConnection connection = new CopycatTransportConnection(
+ nextConnectionId(),
+ CopycatTransport.Mode.CLIENT,
+ partitionId,
+ remoteAddress,
+ messagingService,
+ context);
+ if (mode == CopycatTransport.Mode.CLIENT) {
+ connection.setBidirectional();
+ }
+ connections.add(connection);
+ return connection;
+ }, context.executor());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 3c5b649..39ce10f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -21,8 +21,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -30,10 +28,8 @@
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
-import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.MoreObjects;
@@ -74,7 +70,6 @@
private final AtomicInteger sendFailures = new AtomicInteger(0);
private final AtomicInteger messagesReceived = new AtomicInteger(0);
private final AtomicInteger receiveFailures = new AtomicInteger(0);
- private final Map<Address, Endpoint> endpointLookupCache = Maps.newConcurrentMap();
CopycatTransportConnection(long connectionId,
CopycatTransport.Mode mode,
@@ -120,7 +115,7 @@
if (message instanceof ReferenceCounted) {
((ReferenceCounted<?>) message).release();
}
- messagingService.sendAndReceive(toEndpoint(remoteAddress),
+ messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
outboundMessageSubject,
baos.toByteArray(),
context.executor())
@@ -240,17 +235,6 @@
.toString();
}
- private Endpoint toEndpoint(Address address) {
- return endpointLookupCache.computeIfAbsent(address, a -> {
- try {
- return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
- } catch (UnknownHostException e) {
- Throwables.propagate(e);
- return null;
- }
- });
- }
-
@SuppressWarnings("rawtypes")
private final class InternalHandler {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 6acb7db..03a5a71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -16,13 +16,11 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,8 +30,8 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
-import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
@@ -47,6 +45,7 @@
*/
public class CopycatTransportServer implements Server {
+ private final Logger log = getLogger(getClass());
private final AtomicBoolean listening = new AtomicBoolean(false);
private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private final PartitionId partitionId;
@@ -73,28 +72,23 @@
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
- AtomicBoolean newConnection = new AtomicBoolean(false);
+ AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
- newConnection.set(true);
- try {
- InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
- int senderPort = sender.port();
- Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
- return new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- senderAddress,
- messagingService,
- getOrCreateContext(context));
- } catch (UnknownHostException e) {
- Throwables.propagate(e);
- return null;
- }
+ newConnectionCreated.set(true);
+ CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ CopycatTransport.toAddress(sender),
+ messagingService,
+ getOrCreateContext(context));
+ log.debug("Created new incoming connection {}", connectionId);
+ newConnection.closeListener(c -> connections.remove(connectionId, c));
+ return newConnection;
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
() -> {
- if (newConnection.get()) {
+ if (newConnectionCreated.get()) {
listener.accept(connection);
}
return connection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 14e1d96..76709d6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -57,6 +57,7 @@
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
+ public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -72,6 +73,8 @@
@Activate
public void activate() {
+ messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
+ (ep, input) -> CompletableFuture.completedFuture(input));
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
metadataService.getClusterMetadata()
@@ -92,6 +95,7 @@
}
public void deactivate() {
+ messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
eventDispatcher.removeSink(PartitionEvent.class);
CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
@@ -151,4 +155,4 @@
.map(Optional::get)
.collect(Collectors.toList());
}
-}
\ No newline at end of file
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 293deef..854aa3c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
+import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Transport;
@@ -38,6 +39,7 @@
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -48,6 +50,8 @@
*/
public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
+ private final Logger log = getLogger(getClass());
+
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
@@ -82,7 +86,13 @@
.withTransport(transport)
.build();
}
- return client.open().thenApply(v -> null);
+ return client.open().whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully started client for partition {}", partition.getId());
+ } else {
+ log.info("Failed to start client for partition {}", partition.getId(), e);
+ }
+ }).thenApply(v -> null);
}
@Override
@@ -156,4 +166,4 @@
public boolean isClosed() {
return client.isClosed();
}
-}
\ No newline at end of file
+}