Using 1.0.0.rc2 version of Atomix
CopycatTransport updates
Change-Id: If384ac2574f098c327f0e5749766268c8d7f1ecd
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 53f57f1..8ed352b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -19,7 +19,7 @@
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Query;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 5b15d33..96729c7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -16,7 +16,6 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -24,10 +23,8 @@
import org.apache.commons.lang.math.RandomUtils;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
@@ -39,42 +36,32 @@
*/
public class CopycatTransportClient implements Client {
- private final Logger log = getLogger(getClass());
private final PartitionId partitionId;
private final MessagingService messagingService;
private final CopycatTransport.Mode mode;
- private final String newConnectionMessageSubject;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.mode = checkNotNull(mode);
- this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
}
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
- return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
- newConnectionMessageSubject,
- Longs.toByteArray(nextConnectionId()))
- .thenApplyAsync(bytes -> {
- long connectionId = Longs.fromByteArray(bytes);
- CopycatTransportConnection connection = new CopycatTransportConnection(
- connectionId,
- CopycatTransport.Mode.CLIENT,
- partitionId,
- remoteAddress,
- messagingService,
- context);
- if (mode == CopycatTransport.Mode.CLIENT) {
- connection.setBidirectional();
- }
- log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
- connections.add(connection);
- return connection;
- }, context.executor());
+ CopycatTransportConnection connection = new CopycatTransportConnection(
+ nextConnectionId(),
+ CopycatTransport.Mode.CLIENT,
+ partitionId,
+ remoteAddress,
+ messagingService,
+ context);
+ if (mode == CopycatTransport.Mode.CLIENT) {
+ connection.setBidirectional();
+ }
+ connections.add(connection);
+ return CompletableFuture.supplyAsync(() -> connection, context.executor());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index d1b686f..39ce10f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -31,14 +31,12 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
@@ -54,7 +52,6 @@
*/
public class CopycatTransportConnection implements Connection {
- private final Logger log = getLogger(getClass());
private final Listeners<Throwable> exceptionListeners = new Listeners<>();
private final Listeners<Connection> closeListeners = new Listeners<>();
@@ -85,11 +82,11 @@
this.remoteAddress = checkNotNull(address);
this.messagingService = checkNotNull(messagingService);
if (mode == CopycatTransport.Mode.CLIENT) {
- this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
- this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
+ this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
+ this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
} else {
- this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
- this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+ this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
+ this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
}
this.context = checkNotNull(context);
}
@@ -206,7 +203,6 @@
@Override
public CompletableFuture<Void> close() {
- log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
closeListeners.forEach(listener -> listener.accept(this));
if (mode == CopycatTransport.Mode.CLIENT) {
messagingService.unregisterHandler(inboundMessageSubject);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index d4f851f..4a31570 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,7 +35,6 @@
import org.slf4j.Logger;
import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
@@ -55,15 +54,13 @@
private final ScheduledExecutorService executorService;
private final PartitionId partitionId;
private final MessagingService messagingService;
- private final String protocolMessageSubject;
- private final String newConnectionMessageSubject;
+ private final String messageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
- this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
- this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
+ this.messageSubject = String.format("onos-copycat-%s", partitionId);
this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
}
@@ -71,49 +68,49 @@
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.compareAndSet(false, true)) {
- // message handler for all non-connection-establishment messages.
- messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
- try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
- long connectionId = input.readLong();
- CopycatTransportConnection connection = connections.get(connectionId);
- if (connection == null) {
- throw new IOException("Closed connection");
- }
- byte[] messagePayload = IOUtils.toByteArray(input);
- return connection.handle(messagePayload);
- } catch (IOException e) {
- return Tools.exceptionalFuture(e);
- }
- });
-
- // message handler for new connection attempts.
ThreadContext context = ThreadContext.currentContextOrThrow();
- messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
- long connectionId = Longs.fromByteArray(payload);
- CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- CopycatTransport.toAddress(sender),
- messagingService,
- getOrCreateContext(context));
- connections.put(connectionId, connection);
- connection.closeListener(c -> connections.remove(connectionId, c));
- log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
- return CompletableFuture.supplyAsync(() -> {
- listener.accept(connection);
- // echo the connectionId back to indicate successful completion.
- return payload;
- }, context.executor());
- });
- context.execute(() -> listenFuture.complete(null));
+ listen(address, listener, context);
}
return listenFuture;
}
+ private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
+ messagingService.registerHandler(messageSubject, (sender, payload) -> {
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+ long connectionId = input.readLong();
+ AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
+ CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
+ newConnectionCreated.set(true);
+ CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ CopycatTransport.toAddress(sender),
+ messagingService,
+ getOrCreateContext(context));
+ log.debug("Created new incoming connection {}", connectionId);
+ newConnection.closeListener(c -> connections.remove(connectionId, c));
+ return newConnection;
+ });
+ byte[] request = IOUtils.toByteArray(input);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ if (newConnectionCreated.get()) {
+ listener.accept(connection);
+ }
+ return connection;
+ }, context.executor()).thenCompose(c -> c.handle(request));
+ } catch (IOException e) {
+ return Tools.exceptionalFuture(e);
+ }
+ });
+ context.execute(() -> {
+ listenFuture.complete(null);
+ });
+ }
+
@Override
public CompletableFuture<Void> close() {
- messagingService.unregisterHandler(newConnectionMessageSubject);
- messagingService.unregisterHandler(protocolMessageSubject);
+ messagingService.unregisterHandler(messageSubject);
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 54abda2..45bd171 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -105,14 +105,16 @@
@Override
public CompletableFuture<Void> leave(PartitionId partitionId) {
- // TODO: Implement
- return Tools.exceptionalFuture(new UnsupportedOperationException());
+ return partitions.get(partitionId)
+ .server()
+ .map(server -> server.close())
+ .orElse(CompletableFuture.completedFuture(null));
}
@Override
public CompletableFuture<Void> join(PartitionId partitionId) {
- // TODO: Implement
- return Tools.exceptionalFuture(new UnsupportedOperationException());
+ return partitions.get(partitionId)
+ .open();
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 41cdf7c..3b142a7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -72,10 +72,22 @@
this.logFolder = logFolder;
}
+ /**
+ * Returns the partition client instance.
+ * @return client
+ */
public StoragePartitionClient client() {
return client;
}
+ /**
+ * Returns the optional server instance.
+ * @return server
+ */
+ public Optional<StoragePartitionServer> server() {
+ return server;
+ }
+
@Override
public CompletableFuture<Void> open() {
return openServer().thenAccept(s -> server = Optional.ofNullable(s))
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index d674ebe..e6669dc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -114,8 +114,7 @@
.withTransport(transport.get())
.withStateMachine(() -> new ResourceManagerState(registry))
.withStorage(Storage.builder()
- // FIXME: StorageLevel should be DISK
- .withStorageLevel(StorageLevel.MEMORY)
+ .withStorageLevel(StorageLevel.DISK)
.withCompactionThreads(1)
.withDirectory(dataFolder)
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)