Using 1.0.0.rc2 version of Atomix
CopycatTransport updates
Change-Id: If384ac2574f098c327f0e5749766268c8d7f1ecd
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java b/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
index 609ac70..3aed8d2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
@@ -120,6 +120,6 @@
* @return new {@code LeaderElector} instance
*/
default LeaderElector asLeaderElector() {
- return asLeaderElector(DEFAULT_OPERTATION_TIMEOUT_MILLIS);
+ return asLeaderElector(Long.MAX_VALUE);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 53f57f1..8ed352b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -19,7 +19,7 @@
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Query;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 5b15d33..96729c7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -16,7 +16,6 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -24,10 +23,8 @@
import org.apache.commons.lang.math.RandomUtils;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
@@ -39,42 +36,32 @@
*/
public class CopycatTransportClient implements Client {
- private final Logger log = getLogger(getClass());
private final PartitionId partitionId;
private final MessagingService messagingService;
private final CopycatTransport.Mode mode;
- private final String newConnectionMessageSubject;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.mode = checkNotNull(mode);
- this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
}
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
- return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
- newConnectionMessageSubject,
- Longs.toByteArray(nextConnectionId()))
- .thenApplyAsync(bytes -> {
- long connectionId = Longs.fromByteArray(bytes);
- CopycatTransportConnection connection = new CopycatTransportConnection(
- connectionId,
- CopycatTransport.Mode.CLIENT,
- partitionId,
- remoteAddress,
- messagingService,
- context);
- if (mode == CopycatTransport.Mode.CLIENT) {
- connection.setBidirectional();
- }
- log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
- connections.add(connection);
- return connection;
- }, context.executor());
+ CopycatTransportConnection connection = new CopycatTransportConnection(
+ nextConnectionId(),
+ CopycatTransport.Mode.CLIENT,
+ partitionId,
+ remoteAddress,
+ messagingService,
+ context);
+ if (mode == CopycatTransport.Mode.CLIENT) {
+ connection.setBidirectional();
+ }
+ connections.add(connection);
+ return CompletableFuture.supplyAsync(() -> connection, context.executor());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index d1b686f..39ce10f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -31,14 +31,12 @@
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
@@ -54,7 +52,6 @@
*/
public class CopycatTransportConnection implements Connection {
- private final Logger log = getLogger(getClass());
private final Listeners<Throwable> exceptionListeners = new Listeners<>();
private final Listeners<Connection> closeListeners = new Listeners<>();
@@ -85,11 +82,11 @@
this.remoteAddress = checkNotNull(address);
this.messagingService = checkNotNull(messagingService);
if (mode == CopycatTransport.Mode.CLIENT) {
- this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
- this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
+ this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
+ this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
} else {
- this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
- this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+ this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
+ this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
}
this.context = checkNotNull(context);
}
@@ -206,7 +203,6 @@
@Override
public CompletableFuture<Void> close() {
- log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
closeListeners.forEach(listener -> listener.accept(this));
if (mode == CopycatTransport.Mode.CLIENT) {
messagingService.unregisterHandler(inboundMessageSubject);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index d4f851f..4a31570 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,7 +35,6 @@
import org.slf4j.Logger;
import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
@@ -55,15 +54,13 @@
private final ScheduledExecutorService executorService;
private final PartitionId partitionId;
private final MessagingService messagingService;
- private final String protocolMessageSubject;
- private final String newConnectionMessageSubject;
+ private final String messageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
- this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
- this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
+ this.messageSubject = String.format("onos-copycat-%s", partitionId);
this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
}
@@ -71,49 +68,49 @@
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.compareAndSet(false, true)) {
- // message handler for all non-connection-establishment messages.
- messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
- try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
- long connectionId = input.readLong();
- CopycatTransportConnection connection = connections.get(connectionId);
- if (connection == null) {
- throw new IOException("Closed connection");
- }
- byte[] messagePayload = IOUtils.toByteArray(input);
- return connection.handle(messagePayload);
- } catch (IOException e) {
- return Tools.exceptionalFuture(e);
- }
- });
-
- // message handler for new connection attempts.
ThreadContext context = ThreadContext.currentContextOrThrow();
- messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
- long connectionId = Longs.fromByteArray(payload);
- CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
- CopycatTransport.Mode.SERVER,
- partitionId,
- CopycatTransport.toAddress(sender),
- messagingService,
- getOrCreateContext(context));
- connections.put(connectionId, connection);
- connection.closeListener(c -> connections.remove(connectionId, c));
- log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
- return CompletableFuture.supplyAsync(() -> {
- listener.accept(connection);
- // echo the connectionId back to indicate successful completion.
- return payload;
- }, context.executor());
- });
- context.execute(() -> listenFuture.complete(null));
+ listen(address, listener, context);
}
return listenFuture;
}
+ private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
+ messagingService.registerHandler(messageSubject, (sender, payload) -> {
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+ long connectionId = input.readLong();
+ AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
+ CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
+ newConnectionCreated.set(true);
+ CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ partitionId,
+ CopycatTransport.toAddress(sender),
+ messagingService,
+ getOrCreateContext(context));
+ log.debug("Created new incoming connection {}", connectionId);
+ newConnection.closeListener(c -> connections.remove(connectionId, c));
+ return newConnection;
+ });
+ byte[] request = IOUtils.toByteArray(input);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ if (newConnectionCreated.get()) {
+ listener.accept(connection);
+ }
+ return connection;
+ }, context.executor()).thenCompose(c -> c.handle(request));
+ } catch (IOException e) {
+ return Tools.exceptionalFuture(e);
+ }
+ });
+ context.execute(() -> {
+ listenFuture.complete(null);
+ });
+ }
+
@Override
public CompletableFuture<Void> close() {
- messagingService.unregisterHandler(newConnectionMessageSubject);
- messagingService.unregisterHandler(protocolMessageSubject);
+ messagingService.unregisterHandler(messageSubject);
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 54abda2..45bd171 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -105,14 +105,16 @@
@Override
public CompletableFuture<Void> leave(PartitionId partitionId) {
- // TODO: Implement
- return Tools.exceptionalFuture(new UnsupportedOperationException());
+ return partitions.get(partitionId)
+ .server()
+ .map(server -> server.close())
+ .orElse(CompletableFuture.completedFuture(null));
}
@Override
public CompletableFuture<Void> join(PartitionId partitionId) {
- // TODO: Implement
- return Tools.exceptionalFuture(new UnsupportedOperationException());
+ return partitions.get(partitionId)
+ .open();
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 41cdf7c..3b142a7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -72,10 +72,22 @@
this.logFolder = logFolder;
}
+ /**
+ * Returns the partition client instance.
+ * @return client
+ */
public StoragePartitionClient client() {
return client;
}
+ /**
+ * Returns the optional server instance.
+ * @return server
+ */
+ public Optional<StoragePartitionServer> server() {
+ return server;
+ }
+
@Override
public CompletableFuture<Void> open() {
return openServer().thenAccept(s -> server = Optional.ofNullable(s))
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index d674ebe..e6669dc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -114,8 +114,7 @@
.withTransport(transport.get())
.withStateMachine(() -> new ResourceManagerState(registry))
.withStorage(Storage.builder()
- // FIXME: StorageLevel should be DISK
- .withStorageLevel(StorageLevel.MEMORY)
+ .withStorageLevel(StorageLevel.DISK)
.withCompactionThreads(1)
.withDirectory(dataFolder)
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index df44b48..4c065a5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -65,7 +65,7 @@
@Override
public CompletableFuture<AtomixConsistentMap> open() {
return super.open().thenApply(result -> {
- client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
+ client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 913c3bc..4f912da 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -22,8 +22,8 @@
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert;
-import io.atomix.copycat.client.Command;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
import java.util.Collection;
import java.util.Map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 9c9b019..a6e6ca0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -18,7 +18,7 @@
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
-import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
@@ -322,8 +322,8 @@
commit.session()
.onStateChange(
state -> {
- if (state == Session.State.CLOSED
- || state == Session.State.EXPIRED) {
+ if (state == ServerSession.State.CLOSED
+ || state == ServerSession.State.EXPIRED) {
Commit<? extends Listen> listener = listeners.remove(sessionId);
if (listener != null) {
listener.close();
@@ -503,21 +503,21 @@
}
@Override
- public void register(Session session) {
+ public void register(ServerSession session) {
}
@Override
- public void unregister(Session session) {
+ public void unregister(ServerSession session) {
closeListener(session.id());
}
@Override
- public void expire(Session session) {
+ public void expire(ServerSession session) {
closeListener(session.id());
}
@Override
- public void close(Session session) {
+ public void close(ServerSession session) {
closeListener(session.id());
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index b7e48fa..9995c4d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -57,7 +57,7 @@
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
- client.session().onEvent("change", this::handleEvent);
+ client.onEvent("change", this::handleEvent);
return result;
});
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index e7de783..235bd07 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -32,8 +32,8 @@
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert;
-import io.atomix.copycat.client.Command;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
/**
* {@link AtomixLeaderElector} resource state machine operations.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 9b58226..23dcf52 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -16,7 +16,7 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
@@ -265,7 +265,7 @@
return electionState == null ? new LinkedList<>() : electionState.candidates();
}
- private void onSessionEnd(Session session) {
+ private void onSessionEnd(ServerSession session) {
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
if (listener != null) {
listener.close();
@@ -337,7 +337,7 @@
this.termStartTime = termStartTime;
}
- public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
+ public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
if (registration.isPresent()) {
@@ -409,21 +409,21 @@
}
@Override
- public void register(Session session) {
+ public void register(ServerSession session) {
}
@Override
- public void unregister(Session session) {
+ public void unregister(ServerSession session) {
onSessionEnd(session);
}
@Override
- public void expire(Session session) {
+ public void expire(ServerSession session) {
onSessionEnd(session);
}
@Override
- public void close(Session session) {
+ public void close(ServerSession session) {
onSessionEnd(session);
}
diff --git a/pom.xml b/pom.xml
index 3669294..75f5c06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
<onos-build-conf.version>1.2-SNAPSHOT</onos-build-conf.version>
<netty4.version>4.0.33.Final</netty4.version>
<!-- TODO: replace with final release version when it is out -->
- <atomix.version>1.0.0-rc1</atomix.version>
+ <atomix.version>1.0.0-rc2</atomix.version>
<copycat.version>0.5.1.onos</copycat.version>
<openflowj.version>0.9.1.onos</openflowj.version>
<onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>