[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
deleted file mode 100644
index 2c4ee73..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-import io.atomix.variables.internal.LongCommands;
-import org.onlab.util.Match;
-import org.onosproject.cluster.Leader;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
-import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult;
-import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
-import org.onosproject.store.primitives.resources.impl.PrepareResult;
-import org.onosproject.store.primitives.resources.impl.RollbackResult;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DocumentPath;
-import org.onosproject.store.service.DocumentTreeEvent;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.MultimapEvent;
-import org.onosproject.store.service.Task;
-import org.onosproject.store.service.Versioned;
-import org.onosproject.store.service.WorkQueueStats;
-
-import java.util.Arrays;
-import java.util.Optional;
-
-/**
- * Serializer utility for Atomix Catalyst.
- */
-public final class CatalystSerializers {
-
- private CatalystSerializers() {
- }
-
- public static Serializer getSerializer() {
- Serializer serializer = new Serializer();
- TypeSerializerFactory factory =
- new DefaultCatalystTypeSerializerFactory(
- org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
- MapEntryUpdateResult.class,
- MapEntryUpdateResult.Status.class,
- Transaction.State.class,
- PrepareResult.class,
- CommitResult.class,
- DocumentPath.class,
- DocumentTreeUpdateResult.class,
- DocumentTreeUpdateResult.Status.class,
- DocumentTreeEvent.class,
- DocumentTreeEvent.Type.class,
- RollbackResult.class));
- // ONOS classes
- serializer.register(Change.class, factory);
- serializer.register(Leader.class, factory);
- serializer.register(Leadership.class, factory);
- serializer.register(NodeId.class, factory);
- serializer.register(Match.class, factory);
- serializer.register(MapEntryUpdateResult.class, factory);
- serializer.register(MapEntryUpdateResult.Status.class, factory);
- serializer.register(Transaction.State.class, factory);
- serializer.register(PrepareResult.class, factory);
- serializer.register(CommitResult.class, factory);
- serializer.register(RollbackResult.class, factory);
- serializer.register(TransactionId.class, factory);
- serializer.register(MapUpdate.class, factory);
- serializer.register(MapUpdate.Type.class, factory);
- serializer.register(TransactionLog.class, factory);
- serializer.register(Versioned.class, factory);
- serializer.register(MapEvent.class, factory);
- serializer.register(MultimapEvent.class, factory);
- serializer.register(MultimapEvent.Type.class, factory);
- serializer.register(Task.class, factory);
- serializer.register(WorkQueueStats.class, factory);
- serializer.register(DocumentPath.class, factory);
- serializer.register(DocumentTreeUpdateResult.class, factory);
- serializer.register(DocumentTreeUpdateResult.Status.class, factory);
- serializer.register(DocumentTreeEvent.class, factory);
- serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
- serializer.register(ImmutableList.of().getClass(), factory);
- serializer.register(ImmutableList.of("a").getClass(), factory);
- serializer.register(Arrays.asList().getClass(), factory);
- serializer.register(HashMultiset.class, factory);
- serializer.register(Optional.class, factory);
-
- serializer.resolve(new LongCommands.TypeResolver());
- serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
- serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
- serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
- serializer.resolve(new AtomixDocumentTreeCommands.TypeResolver());
- serializer.resolve(new ResourceManagerTypeResolver());
- serializer.resolve(new AtomixConsistentTreeMapCommands.TypeResolver());
- serializer.resolve(new AtomixConsistentMultimapCommands.TypeResolver());
-
- serializer.registerClassLoader(AtomixConsistentMapFactory.class)
- .registerClassLoader(AtomixLeaderElectorFactory.class)
- .registerClassLoader(AtomixWorkQueueFactory.class)
- .registerClassLoader(AtomixDocumentTreeFactory.class)
- .registerClassLoader(AtomixConsistentTreeMapFactory.class)
- .registerClassLoader(AtomixConsistentSetMultimapFactory.class);
-
- return serializer;
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
deleted file mode 100644
index fc94dd6..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Client;
-import io.atomix.catalyst.transport.Server;
-import io.atomix.catalyst.transport.Transport;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Map;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Copycat transport implementation built on {@link MessagingService}.
- */
-public class CopycatTransport implements Transport {
- private final PartitionId partitionId;
- private final MessagingService messagingService;
- private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
-
- static final byte MESSAGE = 0x01;
- static final byte CONNECT = 0x02;
- static final byte CLOSE = 0x03;
-
- static final byte SUCCESS = 0x01;
- static final byte FAILURE = 0x02;
-
- public CopycatTransport(PartitionId partitionId, MessagingService messagingService) {
- this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
- this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
- }
-
- @Override
- public Client client() {
- return new CopycatTransportClient(partitionId, messagingService);
- }
-
- @Override
- public Server server() {
- return new CopycatTransportServer(partitionId, messagingService);
- }
-
- @Override
- public String toString() {
- return toStringHelper(this).toString();
- }
-
- /**
- * Maps {@link Address address} to {@link Endpoint endpoint}.
- * @param address address
- * @return end point
- */
- static Endpoint toEndpoint(Address address) {
- return EP_LOOKUP_CACHE.computeIfAbsent(address, a -> {
- try {
- return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
- } catch (UnknownHostException e) {
- Throwables.propagate(e);
- return null;
- }
- });
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
deleted file mode 100644
index afa98cc..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Client;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.TransportException;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
-import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
-
-/**
- * Copycat transport client implementation.
- */
-public class CopycatTransportClient implements Client {
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final PartitionId partitionId;
- private final String serverSubject;
- private final MessagingService messagingService;
- private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
-
- public CopycatTransportClient(PartitionId partitionId, MessagingService messagingService) {
- this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
- this.serverSubject = String.format("onos-copycat-%s", partitionId);
- this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
- }
-
- @Override
- public CompletableFuture<Connection> connect(Address address) {
- CompletableFuture<Connection> future = new CompletableFuture<>();
- ThreadContext context = ThreadContext.currentContextOrThrow();
- Endpoint endpoint = CopycatTransport.toEndpoint(address);
-
- log.debug("Connecting to {}", address);
-
- ByteBuffer requestBuffer = ByteBuffer.allocate(1);
- requestBuffer.put(CONNECT);
-
- // Send a connect request to the server to get a unique connection ID.
- messagingService.sendAndReceive(endpoint, serverSubject, requestBuffer.array(), context.executor())
- .whenComplete((payload, error) -> {
- Throwable wrappedError = error;
- if (error != null) {
- Throwable rootCause = Throwables.getRootCause(error);
- if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
- wrappedError = new TransportException(error);
- }
- // TODO ONOS-6788 we might consider demoting this warning during startup when there is
- // a race between the server registering handlers and the client sending messages
- log.warn("Connection to {} failed! Reason: {}", address, wrappedError);
- future.completeExceptionally(wrappedError);
- } else {
- // If the connection is successful, the server will send back a
- // connection ID indicating where to send messages for the connection.
- ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
- if (responseBuffer.get() == SUCCESS) {
- long connectionId = responseBuffer.getLong();
- CopycatTransportConnection connection = new CopycatTransportConnection(
- connectionId,
- CopycatTransportConnection.Mode.CLIENT,
- partitionId,
- endpoint,
- messagingService,
- context);
- connection.onClose(connections::remove);
- connections.add(connection);
- future.complete(connection);
- log.debug("Created connection {}-{} to {}", partitionId, connectionId, address);
- } else {
- log.warn("Connection to {} failed!");
- future.completeExceptionally(new ConnectException());
- }
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture<Void> close() {
- return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("partitionId", partitionId)
- .toString();
- }
-}
-
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
deleted file mode 100644
index a3a8539..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import com.google.common.base.Throwables;
-import io.atomix.catalyst.concurrent.Listener;
-import io.atomix.catalyst.concurrent.Listeners;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.serializer.SerializationException;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.TransportException;
-import io.atomix.catalyst.util.reference.ReferenceCounted;
-import org.apache.commons.io.IOUtils;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.MESSAGE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
-
-/**
- * Base Copycat Transport connection.
- */
-public class CopycatTransportConnection implements Connection {
- private static final int MAX_MESSAGE_SIZE = 1024 * 1024;
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final long connectionId;
- private final String localSubject;
- private final String remoteSubject;
- private final PartitionId partitionId;
- private final Endpoint endpoint;
- private final MessagingService messagingService;
- private final ThreadContext context;
- private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<>();
- private final Listeners<Throwable> exceptionListeners = new Listeners<>();
- private final Listeners<Connection> closeListeners = new Listeners<>();
-
- CopycatTransportConnection(
- long connectionId,
- Mode mode,
- PartitionId partitionId,
- Endpoint endpoint,
- MessagingService messagingService,
- ThreadContext context) {
- this.connectionId = connectionId;
- this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
- this.localSubject = mode.getLocalSubject(partitionId, connectionId);
- this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
- this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
- this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
- this.context = checkNotNull(context, "context cannot be null");
- messagingService.registerHandler(localSubject, this::handle);
- }
-
- @Override
- public CompletableFuture<Void> send(Object message) {
- ThreadContext context = ThreadContext.currentContextOrThrow();
- CompletableFuture<Void> future = new CompletableFuture<>();
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeByte(MESSAGE);
- context.serializer().writeObject(message, baos);
- if (message instanceof ReferenceCounted) {
- ((ReferenceCounted<?>) message).release();
- }
-
- byte[] bytes = baos.toByteArray();
- if (bytes.length > MAX_MESSAGE_SIZE) {
- throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
- }
- messagingService.sendAsync(endpoint, remoteSubject, bytes)
- .whenComplete((r, e) -> {
- if (e != null) {
- context.executor().execute(() -> future.completeExceptionally(e));
- } else {
- context.executor().execute(() -> future.complete(null));
- }
- });
- } catch (SerializationException | IOException e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
- public <T, U> CompletableFuture<U> sendAndReceive(T message) {
- ThreadContext context = ThreadContext.currentContextOrThrow();
- CompletableFuture<U> future = new CompletableFuture<>();
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeByte(MESSAGE);
- context.serializer().writeObject(message, baos);
- if (message instanceof ReferenceCounted) {
- ((ReferenceCounted<?>) message).release();
- }
-
- byte[] bytes = baos.toByteArray();
- if (bytes.length > MAX_MESSAGE_SIZE) {
- throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
- }
- messagingService.sendAndReceive(endpoint,
- remoteSubject,
- bytes,
- context.executor())
- .whenComplete((response, error) -> handleResponse(response, error, future));
- } catch (SerializationException | IOException e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
- /**
- * Handles a response received from the other side of the connection.
- */
- private <T> void handleResponse(
- byte[] response,
- Throwable error,
- CompletableFuture<T> future) {
- if (error != null) {
- Throwable rootCause = Throwables.getRootCause(error);
- if (rootCause instanceof MessagingException.NoRemoteHandler) {
- future.completeExceptionally(new TransportException(error));
- close(rootCause);
- } else if (rootCause instanceof SocketException) {
- future.completeExceptionally(new TransportException(error));
- } else {
- future.completeExceptionally(error);
- }
- return;
- }
-
- checkNotNull(response);
- InputStream input = new ByteArrayInputStream(response);
- try {
- byte status = (byte) input.read();
- if (status == FAILURE) {
- Throwable t = context.serializer().readObject(input);
- future.completeExceptionally(t);
- } else {
- try {
- future.complete(context.serializer().readObject(input));
- } catch (SerializationException e) {
- future.completeExceptionally(e);
- }
- }
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
-
- /**
- * Handles a message sent to the connection.
- */
- private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
- try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
- byte type = input.readByte();
- switch (type) {
- case MESSAGE:
- return handleMessage(IOUtils.toByteArray(input));
- case CLOSE:
- return handleClose();
- default:
- throw new IllegalStateException("Invalid message type");
- }
- } catch (IOException e) {
- Throwables.propagate(e);
- return null;
- }
- }
-
- /**
- * Handles a message from the other side of the connection.
- */
- @SuppressWarnings("unchecked")
- private CompletableFuture<byte[]> handleMessage(byte[] message) {
- try {
- Object request = context.serializer().readObject(new ByteArrayInputStream(message));
- InternalHandler handler = handlers.get(request.getClass());
- if (handler == null) {
- log.warn("No handler registered on connection {}-{} for type {}",
- partitionId, connectionId, request.getClass());
- return Tools.exceptionalFuture(new IllegalStateException(
- "No handler registered for " + request.getClass()));
- }
-
- return handler.handle(request).handle((result, error) -> {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- baos.write(error != null ? FAILURE : SUCCESS);
- context.serializer().writeObject(error != null ? error : result, baos);
- byte[] bytes = baos.toByteArray();
- if (bytes.length > MAX_MESSAGE_SIZE) {
- throw new IllegalArgumentException("response exceeds maximum message size " + MAX_MESSAGE_SIZE);
- }
- return bytes;
- } catch (IOException e) {
- Throwables.propagate(e);
- return null;
- }
- });
- } catch (Exception e) {
- return Tools.exceptionalFuture(e);
- }
- }
-
- /**
- * Handles a close request from the other side of the connection.
- */
- private CompletableFuture<byte[]> handleClose() {
- CompletableFuture<byte[]> future = new CompletableFuture<>();
- context.executor().execute(() -> {
- close(null);
- ByteBuffer responseBuffer = ByteBuffer.allocate(1);
- responseBuffer.put(SUCCESS);
- future.complete(responseBuffer.array());
- });
- return future;
- }
-
- @Override
- public <T, U> Connection handler(Class<T> type, Consumer<T> handler) {
- return handler(type, r -> {
- handler.accept(r);
- return null;
- });
- }
-
- @Override
- public <T, U> Connection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
- if (log.isTraceEnabled()) {
- log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type);
- }
- handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
- return this;
- }
-
- @Override
- public Listener<Throwable> onException(Consumer<Throwable> consumer) {
- return exceptionListeners.add(consumer);
- }
-
- @Override
- public Listener<Connection> onClose(Consumer<Connection> consumer) {
- return closeListeners.add(consumer);
- }
-
- @Override
- public CompletableFuture<Void> close() {
- log.debug("Closing connection {}-{}", partitionId, connectionId);
-
- ByteBuffer requestBuffer = ByteBuffer.allocate(1);
- requestBuffer.put(CLOSE);
-
- ThreadContext context = ThreadContext.currentContextOrThrow();
- CompletableFuture<Void> future = new CompletableFuture<>();
- messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
- .whenComplete((payload, error) -> {
- close(error);
- Throwable wrappedError = error;
- if (error != null) {
- Throwable rootCause = Throwables.getRootCause(error);
- if (rootCause instanceof MessagingException.NoRemoteHandler) {
- wrappedError = new TransportException(error);
- }
- future.completeExceptionally(wrappedError);
- } else {
- ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
- if (responseBuffer.get() == SUCCESS) {
- future.complete(null);
- } else {
- future.completeExceptionally(new TransportException("Failed to close connection"));
- }
- }
- });
- return future;
- }
-
- /**
- * Cleans up the connection, unregistering handlers registered on the MessagingService.
- */
- private void close(Throwable error) {
- log.debug("Connection {}-{} closed", partitionId, connectionId);
- messagingService.unregisterHandler(localSubject);
- if (error != null) {
- exceptionListeners.accept(error);
- }
- closeListeners.accept(this);
- }
-
- /**
- * Connection mode used to indicate whether this side of the connection is
- * a client or server.
- */
- enum Mode {
-
- /**
- * Represents the client side of a bi-directional connection.
- */
- CLIENT {
- @Override
- String getLocalSubject(PartitionId partitionId, long connectionId) {
- return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
- }
-
- @Override
- String getRemoteSubject(PartitionId partitionId, long connectionId) {
- return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
- }
- },
-
- /**
- * Represents the server side of a bi-directional connection.
- */
- SERVER {
- @Override
- String getLocalSubject(PartitionId partitionId, long connectionId) {
- return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
- }
-
- @Override
- String getRemoteSubject(PartitionId partitionId, long connectionId) {
- return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
- }
- };
-
- /**
- * Returns the local messaging service subject for the connection in this mode.
- * Subjects generated by the connection mode are guaranteed to be globally unique.
- *
- * @param partitionId the partition ID to which the connection belongs.
- * @param connectionId the connection ID.
- * @return the globally unique local subject for the connection.
- */
- abstract String getLocalSubject(PartitionId partitionId, long connectionId);
-
- /**
- * Returns the remote messaging service subject for the connection in this mode.
- * Subjects generated by the connection mode are guaranteed to be globally unique.
- *
- * @param partitionId the partition ID to which the connection belongs.
- * @param connectionId the connection ID.
- * @return the globally unique remote subject for the connection.
- */
- abstract String getRemoteSubject(PartitionId partitionId, long connectionId);
- }
-
- /**
- * Internal container for a handler/context pair.
- */
- private static class InternalHandler {
- private final Function handler;
- private final ThreadContext context;
-
- InternalHandler(Function handler, ThreadContext context) {
- this.handler = handler;
- this.context = context;
- }
-
- @SuppressWarnings("unchecked")
- CompletableFuture<Object> handle(Object message) {
- CompletableFuture<Object> future = new CompletableFuture<>();
- context.executor().execute(() -> {
- CompletableFuture<Object> responseFuture = (CompletableFuture<Object>) handler.apply(message);
- if (responseFuture != null) {
- responseFuture.whenComplete((r, e) -> {
- if (e != null) {
- future.completeExceptionally((Throwable) e);
- } else {
- future.complete(r);
- }
- });
- }
- });
- return future;
- }
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
deleted file mode 100644
index 8de05a3..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.Sets;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.Server;
-import org.apache.commons.lang3.RandomUtils;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
-import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
-
-/**
- * Copycat transport server implementation.
- */
-public class CopycatTransportServer implements Server {
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final PartitionId partitionId;
- private final String serverSubject;
- private final MessagingService messagingService;
- private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
-
- public CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
- this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
- this.serverSubject = String.format("onos-copycat-%s", partitionId);
- this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
- }
-
- @Override
- public CompletableFuture<Void> listen(Address address, Consumer<Connection> consumer) {
- ThreadContext context = ThreadContext.currentContextOrThrow();
- messagingService.registerHandler(serverSubject, (sender, payload) -> {
-
- // Only connect messages can be sent to the server. Once a connect message
- // is received, the connection will register a separate handler for messaging.
- ByteBuffer requestBuffer = ByteBuffer.wrap(payload);
- if (requestBuffer.get() != CONNECT) {
- ByteBuffer responseBuffer = ByteBuffer.allocate(1);
- responseBuffer.put(FAILURE);
- return CompletableFuture.completedFuture(responseBuffer.array());
- }
-
- // Create the connection and ensure state is cleaned up when the connection is closed.
- long connectionId = RandomUtils.nextLong();
- CopycatTransportConnection connection = new CopycatTransportConnection(
- connectionId,
- CopycatTransportConnection.Mode.SERVER,
- partitionId,
- sender,
- messagingService,
- context);
- connection.onClose(connections::remove);
- connections.add(connection);
-
- CompletableFuture<byte[]> future = new CompletableFuture<>();
-
- // We need to ensure the connection event is called on the Copycat thread
- // and that the future is not completed until the Copycat server has been
- // able to register message handlers, otherwise some messages can be received
- // prior to any handlers being registered.
- context.executor().execute(() -> {
- log.debug("Created connection {}-{}", partitionId, connectionId);
- consumer.accept(connection);
-
- ByteBuffer responseBuffer = ByteBuffer.allocate(9);
- responseBuffer.put(SUCCESS);
- responseBuffer.putLong(connectionId);
- future.complete(responseBuffer.array());
- });
- return future;
- });
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> close() {
- return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("partitionId", partitionId)
- .toString();
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
index a189eb2..45cf193 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
@@ -32,6 +32,6 @@
@Override
public AsyncAtomicCounter build() {
- return primitiveCreator.newAsyncCounter(name(), executorSupplier());
+ return primitiveCreator.newAsyncCounter(name());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
index c92ef22..309220a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
@@ -33,7 +33,7 @@
@Override
public AsyncAtomicCounterMap<K> buildAsyncMap() {
- return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer(), executorSupplier());
+ return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
index ac294f4..c6742cb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
@@ -32,6 +32,6 @@
@Override
public AsyncAtomicIdGenerator build() {
- return primitiveCreator.newAsyncIdGenerator(name(), executorSupplier());
+ return primitiveCreator.newAsyncIdGenerator(name());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index a63fe4be..b17983c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -38,12 +37,6 @@
}
@Override
- public AtomicValueBuilder<V> withExecutorSupplier(Supplier<Executor> executorSupplier) {
- mapBuilder.withExecutorSupplier(executorSupplier);
- return this;
- }
-
- @Override
public AsyncAtomicValue<V> build() {
return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
checkNotNull(serializer()),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
deleted file mode 100644
index b0d6841..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import org.onosproject.store.service.Serializer;
-import org.slf4j.Logger;
-
-import com.google.common.base.Throwables;
-
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.TypeSerializer;
-import io.atomix.catalyst.serializer.TypeSerializerFactory;
-
-/**
- * {@link TypeSerializerFactory} for providing {@link TypeSerializer}s based on
- * {@code org.onosproject.store.service.Serializer}.
- */
-public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFactory {
-
- private final Logger log = getLogger(getClass());
- private final TypeSerializer<?> typeSerializer;
-
- public DefaultCatalystTypeSerializerFactory(Serializer serializer) {
- typeSerializer = new InternalSerializer<>(serializer);
- }
-
- @Override
- public TypeSerializer<?> createSerializer(Class<?> clazz) {
- return typeSerializer;
- }
-
- private class InternalSerializer<T> implements TypeSerializer<T> {
-
- private final Serializer serializer;
-
- InternalSerializer(Serializer serializer) {
- this.serializer = serializer;
- }
-
- @Override
- public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
- try {
- byte[] payload = this.serializer.encode(object);
- buffer.writeInt(payload.length);
- buffer.write(payload);
- } catch (Exception e) {
- log.warn("Failed to serialize {}", object, e);
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
- int size = buffer.readInt();
- try {
- byte[] payload = new byte[size];
- buffer.read(payload);
- return this.serializer.decode(payload);
- } catch (Exception e) {
- log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
- throw Throwables.propagate(e);
- }
- }
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 820174d..b5284de 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -41,7 +41,7 @@
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
- AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer(), executorSupplier());
+ AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
return meteringEnabled() ? DistributedPrimitives.newMeteredMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
index ba7d673..4b23253 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
@@ -36,7 +36,7 @@
@Override
public AsyncConsistentMultimap<K, V> buildMultimap() {
- return primitiveCreator.newAsyncConsistentSetMultimap(name(), serializer(), executorSupplier());
+ return primitiveCreator.newAsyncConsistentSetMultimap(name(), serializer());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
index 5e2a8b4..2aa906a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
@@ -35,7 +35,7 @@
@Override
public AsyncConsistentTreeMap<V> buildTreeMap() {
- return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer(), executorSupplier());
+ return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
index 5e95180..c17f91d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.onosproject.core.ApplicationId;
@@ -54,12 +53,6 @@
}
@Override
- public DistributedSetBuilder<E> withExecutorSupplier(Supplier<Executor> executorSupplier) {
- mapBuilder.withExecutorSupplier(executorSupplier);
- return this;
- }
-
- @Override
public DistributedSetBuilder<E> withPurgeOnUninstall() {
mapBuilder.withPurgeOnUninstall();
return this;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
index 65c0504..8d21e60 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
@@ -47,6 +47,6 @@
@Deprecated
@Override
public AsyncDocumentTree<V> build() {
- return primitiveCreator.newAsyncDocumentTree(name(), serializer(), executorSupplier());
+ return primitiveCreator.newAsyncDocumentTree(name(), serializer());
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
index 6f8f55d..69788f9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -32,6 +32,6 @@
@Override
public AsyncLeaderElector build() {
- return primitiveCreator.newAsyncLeaderElector(name(), executorSupplier());
+ return primitiveCreator.newAsyncLeaderElector(name());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
deleted file mode 100644
index 8955a6d..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import io.atomix.catalyst.concurrent.Listener;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.Command;
-import io.atomix.copycat.Query;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.session.Session;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-/**
- * {@code CopycatClient} that merely delegates control to
- * another CopycatClient.
- */
-public class DelegatingCopycatClient implements CopycatClient {
-
- protected final CopycatClient client;
-
- DelegatingCopycatClient(CopycatClient client) {
- this.client = client;
- }
-
- @Override
- public State state() {
- return client.state();
- }
-
- @Override
- public Listener<State> onStateChange(Consumer<State> callback) {
- return client.onStateChange(callback);
- }
-
- @Override
- public ThreadContext context() {
- return client.context();
- }
-
- @Override
- public Transport transport() {
- return client.transport();
- }
-
- @Override
- public Serializer serializer() {
- return client.serializer();
- }
-
- @Override
- public Session session() {
- return client.session();
- }
-
- @Override
- public <T> CompletableFuture<T> submit(Command<T> command) {
- return client.submit(command);
- }
-
- @Override
- public <T> CompletableFuture<T> submit(Query<T> query) {
- return client.submit(query);
- }
-
- @Override
- public Listener<Void> onEvent(String event, Runnable callback) {
- return client.onEvent(event, callback);
- }
-
- @Override
- public <T> Listener<T> onEvent(String event, Consumer<T> callback) {
- return client.onEvent(event, callback);
- }
-
- @Override
- public CompletableFuture<CopycatClient> connect(Collection<Address> members) {
- return client.connect(members);
- }
-
- @Override
- public CompletableFuture<CopycatClient> recover() {
- return client.recover();
- }
-
- @Override
- public CompletableFuture<Void> close() {
- return client.close();
- }
-}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
deleted file mode 100644
index 504fa75..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.onosproject.store.service.AsyncAtomicCounter;
-
-/**
- * {@link AsyncAtomicCounter} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicCounter extends ExecutingDistributedPrimitive implements AsyncAtomicCounter {
- private final AsyncAtomicCounter delegateCounter;
-
- public ExecutingAsyncAtomicCounter(
- AsyncAtomicCounter delegateCounter, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateCounter, orderedExecutor, threadPoolExecutor);
- this.delegateCounter = delegateCounter;
- }
-
- @Override
- public CompletableFuture<Long> incrementAndGet() {
- return asyncFuture(delegateCounter.incrementAndGet());
- }
-
- @Override
- public CompletableFuture<Long> getAndIncrement() {
- return asyncFuture(delegateCounter.getAndIncrement());
- }
-
- @Override
- public CompletableFuture<Long> getAndAdd(long delta) {
- return asyncFuture(delegateCounter.getAndAdd(delta));
- }
-
- @Override
- public CompletableFuture<Long> addAndGet(long delta) {
- return asyncFuture(delegateCounter.addAndGet(delta));
- }
-
- @Override
- public CompletableFuture<Long> get() {
- return asyncFuture(delegateCounter.get());
- }
-
- @Override
- public CompletableFuture<Void> set(long value) {
- return asyncFuture(delegateCounter.set(value));
- }
-
- @Override
- public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
- return asyncFuture(delegateCounter.compareAndSet(expectedValue, updateValue));
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
deleted file mode 100644
index a17a2f0..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.onosproject.store.service.AsyncAtomicCounterMap;
-
-/**
- * {@link org.onosproject.store.service.AsyncAtomicCounterMap} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicCounterMap<K>
- extends ExecutingDistributedPrimitive implements AsyncAtomicCounterMap<K> {
- private final AsyncAtomicCounterMap<K> delegateMap;
-
- public ExecutingAsyncAtomicCounterMap(
- AsyncAtomicCounterMap<K> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateMap, orderedExecutor, threadPoolExecutor);
- this.delegateMap = delegateMap;
- }
-
- @Override
- public CompletableFuture<Long> incrementAndGet(K key) {
- return asyncFuture(delegateMap.incrementAndGet(key));
- }
-
- @Override
- public CompletableFuture<Long> decrementAndGet(K key) {
- return asyncFuture(delegateMap.decrementAndGet(key));
- }
-
- @Override
- public CompletableFuture<Long> getAndIncrement(K key) {
- return asyncFuture(delegateMap.getAndIncrement(key));
- }
-
- @Override
- public CompletableFuture<Long> getAndDecrement(K key) {
- return asyncFuture(delegateMap.getAndDecrement(key));
- }
-
- @Override
- public CompletableFuture<Long> addAndGet(K key, long delta) {
- return asyncFuture(delegateMap.addAndGet(key, delta));
- }
-
- @Override
- public CompletableFuture<Long> getAndAdd(K key, long delta) {
- return asyncFuture(delegateMap.getAndAdd(key, delta));
- }
-
- @Override
- public CompletableFuture<Long> get(K key) {
- return asyncFuture(delegateMap.get(key));
- }
-
- @Override
- public CompletableFuture<Long> put(K key, long newValue) {
- return asyncFuture(delegateMap.put(key, newValue));
- }
-
- @Override
- public CompletableFuture<Long> putIfAbsent(K key, long newValue) {
- return asyncFuture(delegateMap.putIfAbsent(key, newValue));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(K key, long expectedOldValue, long newValue) {
- return asyncFuture(delegateMap.replace(key, expectedOldValue, newValue));
- }
-
- @Override
- public CompletableFuture<Long> remove(K key) {
- return asyncFuture(delegateMap.remove(key));
- }
-
- @Override
- public CompletableFuture<Boolean> remove(K key, long value) {
- return asyncFuture(delegateMap.remove(key, value));
- }
-
- @Override
- public CompletableFuture<Integer> size() {
- return asyncFuture(delegateMap.size());
- }
-
- @Override
- public CompletableFuture<Boolean> isEmpty() {
- return asyncFuture(delegateMap.isEmpty());
- }
-
- @Override
- public CompletableFuture<Void> clear() {
- return asyncFuture(delegateMap.clear());
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
deleted file mode 100644
index baf2b8a..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.onosproject.store.service.AsyncAtomicIdGenerator;
-
-/**
- * {@link AsyncAtomicIdGenerator} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicIdGenerator extends ExecutingDistributedPrimitive implements AsyncAtomicIdGenerator {
- private final AsyncAtomicIdGenerator delegateIdGenerator;
-
- public ExecutingAsyncAtomicIdGenerator(
- AsyncAtomicIdGenerator delegateIdGenerator, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateIdGenerator, orderedExecutor, threadPoolExecutor);
- this.delegateIdGenerator = delegateIdGenerator;
- }
-
- @Override
- public CompletableFuture<Long> nextId() {
- return asyncFuture(delegateIdGenerator.nextId());
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
deleted file mode 100644
index c8bba52..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import com.google.common.collect.Maps;
-import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AtomicValueEventListener;
-
-/**
- * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicValue<V> extends ExecutingDistributedPrimitive implements AsyncAtomicValue<V> {
- private final AsyncAtomicValue<V> delegateValue;
- private final Executor orderedExecutor;
- private final Map<AtomicValueEventListener<V>, AtomicValueEventListener<V>> listenerMap = Maps.newConcurrentMap();
-
- public ExecutingAsyncAtomicValue(
- AsyncAtomicValue<V> delegateValue, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateValue, orderedExecutor, threadPoolExecutor);
- this.delegateValue = delegateValue;
- this.orderedExecutor = orderedExecutor;
- }
-
- @Override
- public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
- return asyncFuture(delegateValue.compareAndSet(expect, update));
- }
-
- @Override
- public CompletableFuture<V> get() {
- return asyncFuture(delegateValue.get());
- }
-
- @Override
- public CompletableFuture<V> getAndSet(V value) {
- return asyncFuture(delegateValue.getAndSet(value));
- }
-
- @Override
- public CompletableFuture<Void> set(V value) {
- return asyncFuture(delegateValue.set(value));
- }
-
- @Override
- public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
- AtomicValueEventListener<V> wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e));
- listenerMap.put(listener, wrappedListener);
- return asyncFuture(delegateValue.addListener(wrappedListener));
- }
-
- @Override
- public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
- AtomicValueEventListener<V> wrappedListener = listenerMap.remove(listener);
- if (wrappedListener != null) {
- return asyncFuture(delegateValue.removeListener(wrappedListener));
- }
- return CompletableFuture.completedFuture(null);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
deleted file mode 100644
index d955121..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-/**
- * An {@link org.onosproject.store.service.AsyncConsistentMap} that completes asynchronous calls on a provided
- * {@link Executor}.
- */
-public class ExecutingAsyncConsistentMap<K, V>
- extends ExecutingDistributedPrimitive implements AsyncConsistentMap<K, V> {
- private final AsyncConsistentMap<K, V> delegateMap;
-
- public ExecutingAsyncConsistentMap(
- AsyncConsistentMap<K, V> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateMap, orderedExecutor, threadPoolExecutor);
- this.delegateMap = delegateMap;
- }
-
- @Override
- public CompletableFuture<Integer> size() {
- return asyncFuture(delegateMap.size());
- }
-
- @Override
- public CompletableFuture<Boolean> containsKey(K key) {
- return asyncFuture(delegateMap.containsKey(key));
- }
-
- @Override
- public CompletableFuture<Boolean> containsValue(V value) {
- return asyncFuture(delegateMap.containsValue(value));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> get(K key) {
- return asyncFuture(delegateMap.get(key));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
- return asyncFuture(delegateMap.getOrDefault(key, defaultValue));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> computeIf(
- K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return asyncFuture(delegateMap.computeIf(key, condition, remappingFunction));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> put(K key, V value) {
- return asyncFuture(delegateMap.put(key, value));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
- return asyncFuture(delegateMap.putAndGet(key, value));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> remove(K key) {
- return asyncFuture(delegateMap.remove(key));
- }
-
- @Override
- public CompletableFuture<Void> clear() {
- return asyncFuture(delegateMap.clear());
- }
-
- @Override
- public CompletableFuture<Set<K>> keySet() {
- return asyncFuture(delegateMap.keySet());
- }
-
- @Override
- public CompletableFuture<Collection<Versioned<V>>> values() {
- return asyncFuture(delegateMap.values());
- }
-
- @Override
- public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
- return asyncFuture(delegateMap.entrySet());
- }
-
- @Override
- public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
- return asyncFuture(delegateMap.putIfAbsent(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> remove(K key, V value) {
- return asyncFuture(delegateMap.remove(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> remove(K key, long version) {
- return asyncFuture(delegateMap.remove(key, version));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> replace(K key, V value) {
- return asyncFuture(delegateMap.replace(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
- return asyncFuture(delegateMap.replace(key, oldValue, newValue));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
- return asyncFuture(delegateMap.replace(key, oldVersion, newValue));
- }
-
- @Override
- public CompletableFuture<Version> begin(TransactionId transactionId) {
- return asyncFuture(delegateMap.begin(transactionId));
- }
-
- @Override
- public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
- return asyncFuture(delegateMap.prepare(transactionLog));
- }
-
- @Override
- public CompletableFuture<Void> commit(TransactionId transactionId) {
- return asyncFuture(delegateMap.commit(transactionId));
- }
-
- @Override
- public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return asyncFuture(delegateMap.rollback(transactionId));
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
- return asyncFuture(delegateMap.prepareAndCommit(transactionLog));
- }
-
- @Override
- public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
- return addListener(listener);
- }
-
- @Override
- public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
- return asyncFuture(delegateMap.addListener(listener, executor));
- }
-
- @Override
- public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
- return asyncFuture(delegateMap.removeListener(listener));
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
deleted file mode 100644
index 2045586..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.MultimapEventListener;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
- * {@link Executor}.
- */
-public class ExecutingAsyncConsistentMultimap<K, V>
- extends ExecutingDistributedPrimitive implements AsyncConsistentMultimap<K, V> {
- private final AsyncConsistentMultimap<K, V> delegateMap;
-
- public ExecutingAsyncConsistentMultimap(
- AsyncConsistentMultimap<K, V> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateMap, orderedExecutor, threadPoolExecutor);
- this.delegateMap = delegateMap;
- }
-
- @Override
- public CompletableFuture<Integer> size() {
- return asyncFuture(delegateMap.size());
- }
-
- @Override
- public CompletableFuture<Boolean> isEmpty() {
- return asyncFuture(delegateMap.isEmpty());
- }
-
- @Override
- public CompletableFuture<Boolean> containsKey(K key) {
- return asyncFuture(delegateMap.containsKey(key));
- }
-
- @Override
- public CompletableFuture<Boolean> containsValue(V value) {
- return asyncFuture(delegateMap.containsValue(value));
- }
-
- @Override
- public CompletableFuture<Boolean> containsEntry(K key, V value) {
- return asyncFuture(delegateMap.containsEntry(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> put(K key, V value) {
- return asyncFuture(delegateMap.put(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> remove(K key, V value) {
- return asyncFuture(delegateMap.remove(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
- return asyncFuture(delegateMap.removeAll(key, values));
- }
-
- @Override
- public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
- return asyncFuture(delegateMap.removeAll(key));
- }
-
- @Override
- public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
- return asyncFuture(delegateMap.putAll(key, values));
- }
-
- @Override
- public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
- return asyncFuture(delegateMap.replaceValues(key, values));
- }
-
- @Override
- public CompletableFuture<Void> clear() {
- return asyncFuture(delegateMap.clear());
- }
-
- @Override
- public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
- return asyncFuture(delegateMap.get(key));
- }
-
- @Override
- public CompletableFuture<Set<K>> keySet() {
- return asyncFuture(delegateMap.keySet());
- }
-
- @Override
- public CompletableFuture<Multiset<K>> keys() {
- return asyncFuture(delegateMap.keys());
- }
-
- @Override
- public CompletableFuture<Multiset<V>> values() {
- return asyncFuture(delegateMap.values());
- }
-
- @Override
- public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
- return asyncFuture(delegateMap.entries());
- }
-
- @Override
- public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
- return asyncFuture(delegateMap.addListener(listener, executor));
- }
-
- @Override
- public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
- return asyncFuture(delegateMap.removeListener(listener));
- }
-
- @Override
- public CompletableFuture<Map<K, Collection<V>>> asMap() {
- return asyncFuture(delegateMap.asMap());
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
deleted file mode 100644
index 3a4fe85..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentTreeMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-/**
- * {@link org.onosproject.store.service.AsyncConsistentTreeMap} that executes asynchronous callbacks on a provided
- * {@link Executor}.
- */
-public class ExecutingAsyncConsistentTreeMap<V>
- extends ExecutingDistributedPrimitive implements AsyncConsistentTreeMap<V> {
- private final AsyncConsistentTreeMap<V> delegateMap;
-
- public ExecutingAsyncConsistentTreeMap(
- AsyncConsistentTreeMap<V> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateMap, orderedExecutor, threadPoolExecutor);
- this.delegateMap = delegateMap;
- }
-
- @Override
- public CompletableFuture<String> firstKey() {
- return asyncFuture(delegateMap.firstKey());
- }
-
- @Override
- public CompletableFuture<String> lastKey() {
- return asyncFuture(delegateMap.lastKey());
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> ceilingEntry(String key) {
- return asyncFuture(delegateMap.ceilingEntry(key));
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> floorEntry(String key) {
- return asyncFuture(delegateMap.floorEntry(key));
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> higherEntry(String key) {
- return asyncFuture(delegateMap.higherEntry(key));
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> lowerEntry(String key) {
- return asyncFuture(delegateMap.lowerEntry(key));
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> firstEntry() {
- return asyncFuture(delegateMap.firstEntry());
- }
-
- @Override
- public CompletableFuture<Integer> size() {
- return asyncFuture(delegateMap.size());
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> lastEntry() {
- return asyncFuture(delegateMap.lastEntry());
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> pollFirstEntry() {
- return asyncFuture(delegateMap.pollFirstEntry());
- }
-
- @Override
- public CompletableFuture<Boolean> containsKey(String key) {
- return asyncFuture(delegateMap.containsKey(key));
- }
-
- @Override
- public CompletableFuture<Map.Entry<String, Versioned<V>>> pollLastEntry() {
- return asyncFuture(delegateMap.pollLastEntry());
- }
-
- @Override
- public CompletableFuture<String> lowerKey(String key) {
- return asyncFuture(delegateMap.lowerKey(key));
- }
-
- @Override
- public CompletableFuture<Boolean> containsValue(V value) {
- return asyncFuture(delegateMap.containsValue(value));
- }
-
- @Override
- public CompletableFuture<String> floorKey(String key) {
- return asyncFuture(delegateMap.floorKey(key));
- }
-
- @Override
- public CompletableFuture<String> ceilingKey(String key) {
- return asyncFuture(delegateMap.ceilingKey(key));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> get(String key) {
- return asyncFuture(delegateMap.get(key));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> getOrDefault(String key, V defaultValue) {
- return asyncFuture(delegateMap.getOrDefault(key, defaultValue));
- }
-
- @Override
- public CompletableFuture<String> higherKey(String key) {
- return asyncFuture(delegateMap.higherKey(key));
- }
-
- @Override
- public CompletableFuture<NavigableSet<String>> navigableKeySet() {
- return asyncFuture(delegateMap.navigableKeySet());
- }
-
- @Override
- public CompletableFuture<NavigableMap<String, V>> subMap(
- String upperKey, String lowerKey, boolean inclusiveUpper, boolean inclusiveLower) {
- return asyncFuture(delegateMap.subMap(upperKey, lowerKey, inclusiveUpper, inclusiveLower));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> computeIf(
- String key, Predicate<? super V> condition,
- BiFunction<? super String, ? super V, ? extends V> remappingFunction) {
- return asyncFuture(delegateMap.computeIf(key, condition, remappingFunction));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> put(String key, V value) {
- return asyncFuture(delegateMap.put(key, value));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
- return asyncFuture(delegateMap.putAndGet(key, value));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> remove(String key) {
- return asyncFuture(delegateMap.remove(key));
- }
-
- @Override
- public CompletableFuture<Void> clear() {
- return asyncFuture(delegateMap.clear());
- }
-
- @Override
- public CompletableFuture<Set<String>> keySet() {
- return asyncFuture(delegateMap.keySet());
- }
-
- @Override
- public CompletableFuture<Collection<Versioned<V>>> values() {
- return asyncFuture(delegateMap.values());
- }
-
- @Override
- public CompletableFuture<Set<Map.Entry<String, Versioned<V>>>> entrySet() {
- return asyncFuture(delegateMap.entrySet());
- }
-
- @Override
- public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
- return asyncFuture(delegateMap.putIfAbsent(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> remove(String key, V value) {
- return asyncFuture(delegateMap.remove(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> remove(String key, long version) {
- return asyncFuture(delegateMap.remove(key, version));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> replace(String key, V value) {
- return asyncFuture(delegateMap.replace(key, value));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(String key, V oldValue, V newValue) {
- return asyncFuture(delegateMap.replace(key, oldValue, newValue));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(String key, long oldVersion, V newValue) {
- return asyncFuture(delegateMap.replace(key, oldVersion, newValue));
- }
-
- @Override
- public CompletableFuture<Version> begin(TransactionId transactionId) {
- return asyncFuture(delegateMap.begin(transactionId));
- }
-
- @Override
- public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
- return asyncFuture(delegateMap.prepare(transactionLog));
- }
-
- @Override
- public CompletableFuture<Void> commit(TransactionId transactionId) {
- return asyncFuture(delegateMap.commit(transactionId));
- }
-
- @Override
- public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return asyncFuture(delegateMap.rollback(transactionId));
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
- return asyncFuture(delegateMap.prepareAndCommit(transactionLog));
- }
-
- @Override
- public CompletableFuture<Void> addListener(MapEventListener<String, V> listener) {
- return addListener(listener);
- }
-
- @Override
- public CompletableFuture<Void> addListener(MapEventListener<String, V> listener, Executor executor) {
- return asyncFuture(delegateMap.addListener(listener, executor));
- }
-
- @Override
- public CompletableFuture<Void> removeListener(MapEventListener<String, V> listener) {
- return asyncFuture(delegateMap.removeListener(listener));
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
deleted file mode 100644
index f6fc3d1..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import com.google.common.collect.Maps;
-import org.onosproject.store.service.AsyncDocumentTree;
-import org.onosproject.store.service.DocumentPath;
-import org.onosproject.store.service.DocumentTreeListener;
-import org.onosproject.store.service.Versioned;
-
-/**
- * {@link AsyncDocumentTree} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncDocumentTree<V> extends ExecutingDistributedPrimitive implements AsyncDocumentTree<V> {
- private final AsyncDocumentTree<V> delegateTree;
- private final Executor orderedExecutor;
- private final Map<DocumentTreeListener<V>, DocumentTreeListener<V>> listenerMap = Maps.newConcurrentMap();
-
- public ExecutingAsyncDocumentTree(
- AsyncDocumentTree<V> delegateTree, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateTree, orderedExecutor, threadPoolExecutor);
- this.delegateTree = delegateTree;
- this.orderedExecutor = orderedExecutor;
- }
-
- @Override
- public DocumentPath root() {
- return delegateTree.root();
- }
-
- @Override
- public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
- return asyncFuture(delegateTree.getChildren(path));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> get(DocumentPath path) {
- return asyncFuture(delegateTree.get(path));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
- return asyncFuture(delegateTree.set(path, value));
- }
-
- @Override
- public CompletableFuture<Boolean> create(DocumentPath path, V value) {
- return asyncFuture(delegateTree.create(path, value));
- }
-
- @Override
- public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
- return asyncFuture(delegateTree.createRecursive(path, value));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
- return asyncFuture(delegateTree.replace(path, newValue, version));
- }
-
- @Override
- public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
- return asyncFuture(delegateTree.replace(path, newValue, currentValue));
- }
-
- @Override
- public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
- return asyncFuture(delegateTree.removeNode(path));
- }
-
- @Override
- public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
- DocumentTreeListener<V> wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e));
- listenerMap.put(listener, wrappedListener);
- return asyncFuture(delegateTree.addListener(path, wrappedListener));
- }
-
- @Override
- public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
- DocumentTreeListener<V> wrappedListener = listenerMap.remove(listener);
- if (wrappedListener != null) {
- return asyncFuture(delegateTree.removeListener(wrappedListener));
- }
- return CompletableFuture.completedFuture(null);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
deleted file mode 100644
index ba7cb81..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import com.google.common.collect.Maps;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.service.AsyncLeaderElector;
-
-/**
- * {@link AsyncLeaderElector} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncLeaderElector extends ExecutingDistributedPrimitive implements AsyncLeaderElector {
- private final AsyncLeaderElector delegateElector;
- private final Executor orderedExecutor;
- private final Map<Consumer<Change<Leadership>>, Consumer<Change<Leadership>>> listenerMap = Maps.newConcurrentMap();
-
- public ExecutingAsyncLeaderElector(
- AsyncLeaderElector delegateElector, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateElector, orderedExecutor, threadPoolExecutor);
- this.delegateElector = delegateElector;
- this.orderedExecutor = orderedExecutor;
- }
-
- @Override
- public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return asyncFuture(delegateElector.run(topic, nodeId));
- }
-
- @Override
- public CompletableFuture<Void> withdraw(String topic) {
- return asyncFuture(delegateElector.withdraw(topic));
- }
-
- @Override
- public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return asyncFuture(delegateElector.anoint(topic, nodeId));
- }
-
- @Override
- public CompletableFuture<Void> evict(NodeId nodeId) {
- return asyncFuture(delegateElector.evict(nodeId));
- }
-
- @Override
- public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
- return asyncFuture(delegateElector.promote(topic, nodeId));
- }
-
- @Override
- public CompletableFuture<Leadership> getLeadership(String topic) {
- return asyncFuture(delegateElector.getLeadership(topic));
- }
-
- @Override
- public CompletableFuture<Map<String, Leadership>> getLeaderships() {
- return asyncFuture(delegateElector.getLeaderships());
- }
-
- @Override
- public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
- Consumer<Change<Leadership>> wrappedListener = e -> orderedExecutor.execute(() -> listener.accept(e));
- listenerMap.put(listener, wrappedListener);
- return asyncFuture(delegateElector.addChangeListener(wrappedListener));
- }
-
- @Override
- public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
- Consumer<Change<Leadership>> wrappedListener = listenerMap.remove(listener);
- if (wrappedListener != null) {
- return asyncFuture(delegateElector.removeChangeListener(wrappedListener));
- }
- return CompletableFuture.completedFuture(null);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
deleted file mode 100644
index 836a682..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import com.google.common.collect.Maps;
-import org.onlab.util.Tools;
-import org.onosproject.store.service.DistributedPrimitive;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for primitives that delegate asynchronous callbacks to a user provided {@link Executor}.
- */
-public abstract class ExecutingDistributedPrimitive
- extends DelegatingDistributedPrimitive {
- private final DistributedPrimitive primitive;
- private final Executor orderedExecutor;
- private final Executor threadPoolExecutor;
- private final Map<Consumer<Status>, Consumer<Status>> listenerMap = Maps.newConcurrentMap();
-
- protected ExecutingDistributedPrimitive(
- DistributedPrimitive primitive, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(primitive);
- this.primitive = primitive;
- this.orderedExecutor = checkNotNull(orderedExecutor);
- this.threadPoolExecutor = checkNotNull(threadPoolExecutor);
- }
-
- /**
- * Creates a future to be completed asynchronously on the provided ordered and thread pool executors.
- *
- * @param future the future to be completed asynchronously
- * @param <T> future result type
- * @return a new {@link CompletableFuture} to be completed asynchronously using the primitive thread model
- */
- protected <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future) {
- return Tools.orderedFuture(future, orderedExecutor, threadPoolExecutor);
- }
-
- @Override
- public CompletableFuture<Void> destroy() {
- return asyncFuture(primitive.destroy());
- }
-
- @Override
- public void addStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
- Consumer<DistributedPrimitive.Status> wrappedListener =
- status -> orderedExecutor.execute(() -> listener.accept(status));
- listenerMap.put(listener, wrappedListener);
- primitive.addStatusChangeListener(wrappedListener);
- }
-
- @Override
- public void removeStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
- Consumer<DistributedPrimitive.Status> wrappedListener = listenerMap.remove(listener);
- if (wrappedListener != null) {
- primitive.removeStatusChangeListener(wrappedListener);
- }
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
deleted file mode 100644
index e6290b8..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.Task;
-import org.onosproject.store.service.WorkQueue;
-import org.onosproject.store.service.WorkQueueStats;
-
-/**
- * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingWorkQueue<E> extends ExecutingDistributedPrimitive implements WorkQueue<E> {
- private final WorkQueue<E> delegateQueue;
-
- public ExecutingWorkQueue(WorkQueue<E> delegateQueue, Executor orderedExecutor, Executor threadPoolExecutor) {
- super(delegateQueue, orderedExecutor, threadPoolExecutor);
- this.delegateQueue = delegateQueue;
- }
-
- @Override
- public CompletableFuture<Void> addMultiple(Collection<E> items) {
- return asyncFuture(delegateQueue.addMultiple(items));
- }
-
- @Override
- public CompletableFuture<Collection<Task<E>>> take(int maxItems) {
- return asyncFuture(delegateQueue.take(maxItems));
- }
-
- @Override
- public CompletableFuture<Void> complete(Collection<String> taskIds) {
- return asyncFuture(delegateQueue.complete(taskIds));
- }
-
- @Override
- public CompletableFuture<Void> registerTaskProcessor(
- Consumer<E> taskProcessor, int parallelism, Executor executor) {
- return asyncFuture(delegateQueue.registerTaskProcessor(taskProcessor, parallelism, executor));
- }
-
- @Override
- public CompletableFuture<Void> stopProcessing() {
- return asyncFuture(delegateQueue.stopProcessing());
- }
-
- @Override
- public CompletableFuture<WorkQueueStats> stats() {
- return asyncFuture(delegateQueue.stats());
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 5564844..ea8d075 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -40,8 +40,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.Executor;
-import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -60,13 +58,12 @@
}
@Override
- public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
checkNotNull(name);
checkNotNull(serializer);
Map<PartitionId, AsyncConsistentMap<K, V>> maps =
Maps.transformValues(members,
- partition -> partition.newAsyncConsistentMap(name, serializer, executorSupplier));
+ partition -> partition.newAsyncConsistentMap(name, serializer));
Hasher<K> hasher = key -> {
int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -75,51 +72,46 @@
}
@Override
- public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncConsistentTreeMap(name, serializer, executorSupplier);
+ public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
+ return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
}
@Override
- public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncConsistentSetMultimap(name, serializer, executorSupplier);
+ public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
+ return getCreator(name).newAsyncConsistentSetMultimap(name, serializer);
}
@Override
- public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
+ public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
+ return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
}
@Override
- public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncAtomicCounterMap(name, serializer, executorSupplier);
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+ return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
}
@Override
- public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncCounter(name, executorSupplier);
+ public AsyncAtomicCounter newAsyncCounter(String name) {
+ return getCreator(name).newAsyncCounter(name);
}
@Override
- public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncIdGenerator(name, executorSupplier);
+ public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
+ return getCreator(name).newAsyncIdGenerator(name);
}
@Override
- public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncAtomicValue(name, serializer, executorSupplier);
+ public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
+ return getCreator(name).newAsyncAtomicValue(name, serializer);
}
@Override
- public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
+ public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
Maps.transformValues(members,
- partition -> partition.newAsyncLeaderElector(name, executorSupplier));
+ partition -> partition.newAsyncLeaderElector(name));
Hasher<String> hasher = topic -> {
int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -128,14 +120,13 @@
}
@Override
- public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return getCreator(name).newWorkQueue(name, serializer, executorSupplier);
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ return getCreator(name).newWorkQueue(name, serializer);
}
@Override
- public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return getCreator(name).newAsyncDocumentTree(name, serializer, executorSupplier);
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+ return getCreator(name).newAsyncDocumentTree(name, serializer);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
deleted file mode 100644
index d541bb1..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.maxPriority;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.net.ConnectException;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Predicate;
-
-import org.onlab.util.Tools;
-import org.onosproject.store.service.StorageException;
-import org.slf4j.Logger;
-
-import com.google.common.base.Throwables;
-
-import io.atomix.catalyst.transport.TransportException;
-import io.atomix.copycat.Query;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.error.QueryException;
-import io.atomix.copycat.error.UnknownSessionException;
-import io.atomix.copycat.session.ClosedSessionException;
-
-/**
- * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
- */
-public class OnosCopycatClient extends DelegatingCopycatClient {
-
- private final int maxRetries;
- private final long delayBetweenRetriesMillis;
- private final ScheduledExecutorService executor;
- private final Logger log = getLogger(getClass());
-
- private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
- || e instanceof TimeoutException
- || e instanceof TransportException
- || e instanceof ClosedChannelException
- || e instanceof QueryException
- || e instanceof UnknownSessionException
- || e instanceof ClosedSessionException
- || e instanceof StorageException.Unavailable;
-
- OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
- super(client);
- this.maxRetries = maxRetries;
- this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
- this.executor = newSingleThreadScheduledExecutor(maxPriority(groupedThreads("OnosCopycat", "client", log)));
- }
-
- @Override
- public CompletableFuture<Void> close() {
- executor.shutdown();
- return super.close();
- }
-
- @Override
- public <T> CompletableFuture<T> submit(Query<T> query) {
- if (state() == State.CLOSED) {
- return Tools.exceptionalFuture(new StorageException.Unavailable());
- }
- CompletableFuture<T> future = new CompletableFuture<>();
- executor.execute(() -> submit(query, 1, future));
- return future;
- }
-
- private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
- client.submit(query).whenComplete((r, e) -> {
- if (e != null) {
- if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
- log.debug("Retry attempt ({} of {}). Failure due to {}",
- attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
- executor.schedule(() ->
- submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
- } else {
- future.completeExceptionally(e);
- }
- } else {
- future.complete(r);
- }
- });
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index bacda00..2201bb5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,19 +16,16 @@
package org.onosproject.store.primitives.impl;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -46,7 +43,7 @@
import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.PartitionAdminService;
import org.onosproject.store.primitives.PartitionEvent;
@@ -56,11 +53,9 @@
import org.onosproject.store.service.PartitionInfo;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.PARTITION_READ;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of {@code PartitionService} and {@code PartitionAdminService}.
@@ -73,7 +68,7 @@
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MessagingService messagingService;
+ protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService metadataService;
@@ -84,9 +79,6 @@
private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
- private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
- groupedThreads("onos/primitives", "primitive-events", log));
@Activate
public void activate() {
@@ -96,10 +88,8 @@
currentClusterMetadata.get()
.getPartitions()
.forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
- messagingService,
+ clusterCommunicator,
clusterService,
- CatalystSerializers.getSerializer(),
- sharedPrimitiveExecutor,
new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
new file mode 100644
index 0000000..099e17a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.session.SessionId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Raft client protocol that uses a cluster communicator.
+ */
+public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol {
+
+ public RaftClientCommunicator(
+ PartitionId partitionId,
+ Serializer serializer,
+ ClusterCommunicationService clusterCommunicator) {
+ super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+ }
+
+ @Override
+ public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
+ return sendAndReceive(context.openSessionSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
+ return sendAndReceive(context.closeSessionSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
+ return sendAndReceive(context.keepAliveSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+ return sendAndReceive(context.querySubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
+ return sendAndReceive(context.commandSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
+ return sendAndReceive(context.metadataSubject, request, memberId);
+ }
+
+ @Override
+ public void reset(Collection<MemberId> members, ResetRequest request) {
+ Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
+ clusterCommunicator.multicast(
+ request,
+ context.resetSubject(request.session()),
+ serializer::encode,
+ nodes);
+ }
+
+ @Override
+ public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, Executor executor) {
+ clusterCommunicator.addSubscriber(
+ context.publishSubject(sessionId.id()),
+ serializer::decode,
+ listener,
+ executor);
+ }
+
+ @Override
+ public void unregisterPublishListener(SessionId sessionId) {
+ clusterCommunicator.removeSubscriber(context.publishSubject(sessionId.id()));
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
new file mode 100644
index 0000000..b708a34
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.net.ConnectException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import io.atomix.protocols.raft.RaftException;
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.service.Serializer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for Raft protocol client/server.
+ */
+public abstract class RaftCommunicator {
+ protected final RaftMessageContext context;
+ protected final Serializer serializer;
+ protected final ClusterCommunicationService clusterCommunicator;
+
+ public RaftCommunicator(
+ RaftMessageContext context,
+ Serializer serializer,
+ ClusterCommunicationService clusterCommunicator) {
+ this.context = checkNotNull(context, "context cannot be null");
+ this.serializer = checkNotNull(serializer, "serializer cannot be null");
+ this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
+ }
+
+ protected <T, U> CompletableFuture<U> sendAndReceive(MessageSubject subject, T request, MemberId memberId) {
+ CompletableFuture<U> future = new CompletableFuture<>();
+ clusterCommunicator.<T, U>sendAndReceive(
+ request, subject, serializer::encode, serializer::decode, NodeId.nodeId(memberId.id()))
+ .whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(result);
+ } else {
+ if (error instanceof CompletionException) {
+ error = error.getCause();
+ }
+ if (error instanceof MessagingException.NoRemoteHandler) {
+ error = new ConnectException(error.getMessage());
+ } else if (error instanceof MessagingException.RemoteHandlerFailure
+ || error instanceof MessagingException.ProtocolException) {
+ error = new RaftException.ProtocolException(error.getMessage());
+ }
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
new file mode 100644
index 0000000..2deb86a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+/**
+ * Protocol message context.
+ */
+final class RaftMessageContext {
+ private final String prefix;
+ final MessageSubject openSessionSubject;
+ final MessageSubject closeSessionSubject;
+ final MessageSubject keepAliveSubject;
+ final MessageSubject querySubject;
+ final MessageSubject commandSubject;
+ final MessageSubject metadataSubject;
+ final MessageSubject joinSubject;
+ final MessageSubject leaveSubject;
+ final MessageSubject configureSubject;
+ final MessageSubject reconfigureSubject;
+ final MessageSubject installSubject;
+ final MessageSubject pollSubject;
+ final MessageSubject voteSubject;
+ final MessageSubject appendSubject;
+
+ RaftMessageContext(String prefix) {
+ this.prefix = prefix;
+ this.openSessionSubject = getSubject(prefix, "open");
+ this.closeSessionSubject = getSubject(prefix, "close");
+ this.keepAliveSubject = getSubject(prefix, "keep-alive");
+ this.querySubject = getSubject(prefix, "query");
+ this.commandSubject = getSubject(prefix, "command");
+ this.metadataSubject = getSubject(prefix, "metadata");
+ this.joinSubject = getSubject(prefix, "join");
+ this.leaveSubject = getSubject(prefix, "leave");
+ this.configureSubject = getSubject(prefix, "configure");
+ this.reconfigureSubject = getSubject(prefix, "reconfigure");
+ this.installSubject = getSubject(prefix, "install");
+ this.pollSubject = getSubject(prefix, "poll");
+ this.voteSubject = getSubject(prefix, "vote");
+ this.appendSubject = getSubject(prefix, "append");
+ }
+
+ private static MessageSubject getSubject(String prefix, String type) {
+ if (prefix == null) {
+ return new MessageSubject(type);
+ } else {
+ return new MessageSubject(String.format("%s-%s", prefix, type));
+ }
+ }
+
+ /**
+ * Returns the publish subject for the given session.
+ *
+ * @param sessionId the session for which to return the publish subject
+ * @return the publish subject for the given session
+ */
+ MessageSubject publishSubject(long sessionId) {
+ if (prefix == null) {
+ return new MessageSubject(String.format("publish-%d", sessionId));
+ } else {
+ return new MessageSubject(String.format("%s-publish-%d", prefix, sessionId));
+ }
+ }
+
+ /**
+ * Returns the reset subject for the given session.
+ *
+ * @param sessionId the session for which to return the reset subject
+ * @return the reset subject for the given session
+ */
+ MessageSubject resetSubject(long sessionId) {
+ if (prefix == null) {
+ return new MessageSubject(String.format("reset-%d", sessionId));
+ } else {
+ return new MessageSubject(String.format("%s-reset-%d", prefix, sessionId));
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
new file mode 100644
index 0000000..b31810f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -0,0 +1,301 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.session.SessionId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Raft server protocol that uses a {@link ClusterCommunicationService}.
+ */
+public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
+
+ public RaftServerCommunicator(
+ PartitionId partitionId,
+ Serializer serializer,
+ ClusterCommunicationService clusterCommunicator) {
+ super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+ }
+
+ @Override
+ public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
+ return sendAndReceive(context.openSessionSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
+ return sendAndReceive(context.closeSessionSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
+ return sendAndReceive(context.keepAliveSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+ return sendAndReceive(context.querySubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
+ return sendAndReceive(context.commandSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
+ return sendAndReceive(context.metadataSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
+ return sendAndReceive(context.joinSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
+ return sendAndReceive(context.leaveSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
+ return sendAndReceive(context.configureSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
+ return sendAndReceive(context.reconfigureSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
+ return sendAndReceive(context.installSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
+ return sendAndReceive(context.pollSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
+ return sendAndReceive(context.voteSubject, request, memberId);
+ }
+
+ @Override
+ public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
+ return sendAndReceive(context.appendSubject, request, memberId);
+ }
+
+ @Override
+ public void publish(MemberId memberId, PublishRequest request) {
+ clusterCommunicator.unicast(request,
+ context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
+ }
+
+ @Override
+ public void registerOpenSessionHandler(
+ Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterOpenSessionHandler() {
+ clusterCommunicator.removeSubscriber(context.openSessionSubject);
+ }
+
+ @Override
+ public void registerCloseSessionHandler(
+ Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterCloseSessionHandler() {
+ clusterCommunicator.removeSubscriber(context.closeSessionSubject);
+ }
+
+ @Override
+ public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterKeepAliveHandler() {
+ clusterCommunicator.removeSubscriber(context.keepAliveSubject);
+ }
+
+ @Override
+ public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterQueryHandler() {
+ clusterCommunicator.removeSubscriber(context.querySubject);
+ }
+
+ @Override
+ public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterCommandHandler() {
+ clusterCommunicator.removeSubscriber(context.commandSubject);
+ }
+
+ @Override
+ public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterMetadataHandler() {
+ clusterCommunicator.removeSubscriber(context.metadataSubject);
+ }
+
+ @Override
+ public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterJoinHandler() {
+ clusterCommunicator.removeSubscriber(context.joinSubject);
+ }
+
+ @Override
+ public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterLeaveHandler() {
+ clusterCommunicator.removeSubscriber(context.leaveSubject);
+ }
+
+ @Override
+ public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterConfigureHandler() {
+ clusterCommunicator.removeSubscriber(context.configureSubject);
+ }
+
+ @Override
+ public void registerReconfigureHandler(
+ Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterReconfigureHandler() {
+ clusterCommunicator.removeSubscriber(context.reconfigureSubject);
+ }
+
+ @Override
+ public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterInstallHandler() {
+ clusterCommunicator.removeSubscriber(context.installSubject);
+ }
+
+ @Override
+ public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterPollHandler() {
+ clusterCommunicator.removeSubscriber(context.pollSubject);
+ }
+
+ @Override
+ public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterVoteHandler() {
+ clusterCommunicator.removeSubscriber(context.voteSubject);
+ }
+
+ @Override
+ public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
+ clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
+ }
+
+ @Override
+ public void unregisterAppendHandler() {
+ clusterCommunicator.removeSubscriber(context.appendSubject);
+ }
+
+ @Override
+ public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
+ clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
+ }
+
+ @Override
+ public void unregisterResetListener(SessionId sessionId) {
+ clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
new file mode 100644
index 0000000..2db6aff
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import io.atomix.protocols.raft.RaftError;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.cluster.RaftMember;
+import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
+import io.atomix.protocols.raft.event.RaftEvent;
+import io.atomix.protocols.raft.event.impl.DefaultEventType;
+import io.atomix.protocols.raft.operation.OperationType;
+import io.atomix.protocols.raft.operation.RaftOperation;
+import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.session.RaftSessionMetadata;
+import io.atomix.protocols.raft.session.SessionId;
+import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
+import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
+import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
+import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
+import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
+import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
+import io.atomix.protocols.raft.storage.system.Configuration;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixCounterOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+/**
+ * Storage serializer namespaces.
+ */
+public final class StorageNamespaces {
+
+ /**
+ * Raft protocol namespace.
+ */
+ public static final KryoNamespace RAFT_PROTOCOL = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(OpenSessionRequest.class)
+ .register(OpenSessionResponse.class)
+ .register(CloseSessionRequest.class)
+ .register(CloseSessionResponse.class)
+ .register(KeepAliveRequest.class)
+ .register(KeepAliveResponse.class)
+ .register(QueryRequest.class)
+ .register(QueryResponse.class)
+ .register(CommandRequest.class)
+ .register(CommandResponse.class)
+ .register(MetadataRequest.class)
+ .register(MetadataResponse.class)
+ .register(JoinRequest.class)
+ .register(JoinResponse.class)
+ .register(LeaveRequest.class)
+ .register(LeaveResponse.class)
+ .register(ConfigureRequest.class)
+ .register(ConfigureResponse.class)
+ .register(ReconfigureRequest.class)
+ .register(ReconfigureResponse.class)
+ .register(InstallRequest.class)
+ .register(InstallResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(VoteRequest.class)
+ .register(VoteResponse.class)
+ .register(AppendRequest.class)
+ .register(AppendResponse.class)
+ .register(PublishRequest.class)
+ .register(ResetRequest.class)
+ .register(RaftResponse.Status.class)
+ .register(RaftError.class)
+ .register(RaftError.Type.class)
+ .register(ReadConsistency.class)
+ .register(RaftSessionMetadata.class)
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(RaftOperation.class)
+ .register(RaftEvent.class)
+ .register(DefaultEventType.class)
+ .register(DefaultOperationId.class)
+ .register(OperationType.class)
+ .register(ReadConsistency.class)
+ .register(ArrayList.class)
+ .register(LinkedList.class)
+ .register(Collections.emptyList().getClass())
+ .register(HashSet.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(SessionId.class)
+ .register(RaftMember.Type.class)
+ .register(RaftMember.Status.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .register(AtomixAtomicCounterMapOperations.class)
+ .register(AtomixConsistentMapEvents.class)
+ .register(AtomixConsistentMapOperations.class)
+ .register(AtomixConsistentSetMultimapOperations.class)
+ .register(AtomixConsistentSetMultimapEvents.class)
+ .register(AtomixConsistentTreeMapEvents.class)
+ .register(AtomixConsistentTreeMapOperations.class)
+ .register(AtomixCounterOperations.class)
+ .register(AtomixDocumentTreeEvents.class)
+ .register(AtomixDocumentTreeOperations.class)
+ .register(AtomixLeaderElectorEvents.class)
+ .register(AtomixLeaderElectorOperations.class)
+ .register(AtomixWorkQueueEvents.class)
+ .register(AtomixWorkQueueOperations.class)
+ .build("RaftProtocol");
+
+ /**
+ * Raft storage namespace.
+ */
+ public static final KryoNamespace RAFT_STORAGE = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(RaftOperation.class)
+ .register(ReadConsistency.class)
+ .register(ArrayList.class)
+ .register(HashSet.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(RaftMember.Type.class)
+ .register(RaftMember.Status.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .register(AtomixAtomicCounterMapOperations.class)
+ .register(AtomixConsistentMapOperations.class)
+ .register(AtomixConsistentSetMultimapOperations.class)
+ .register(AtomixConsistentTreeMapOperations.class)
+ .register(AtomixCounterOperations.class)
+ .register(AtomixDocumentTreeOperations.class)
+ .register(AtomixLeaderElectorOperations.class)
+ .register(AtomixWorkQueueOperations.class)
+ .build("RaftStorage");
+
+ private StorageNamespaces() {
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index e9261c8..051c95a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -15,31 +15,36 @@
*/
package org.onosproject.store.primitives.impl;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.resource.ResourceType;
-
import java.io.File;
import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.service.RaftService;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixCounterService;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeService;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorService;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueService;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.PartitionInfo;
-
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableSet;
+import org.onosproject.store.service.Serializer;
/**
* Storage partition.
@@ -47,32 +52,32 @@
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final Serializer serializer;
- private final Executor sharedExecutor;
- private final MessagingService messagingService;
- private final ClusterService clusterService;
+ private final ClusterCommunicationService clusterCommunicator;
private final File logFolder;
private Partition partition;
private NodeId localNodeId;
private StoragePartitionServer server;
private StoragePartitionClient client;
- public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
- new ResourceType(AtomixLeaderElector.class),
- new ResourceType(AtomixConsistentMap.class));
+ public static final Map<String, Supplier<RaftService>> RAFT_SERVICES =
+ ImmutableMap.<String, Supplier<RaftService>>builder()
+ .put(DistributedPrimitive.Type.CONSISTENT_MAP.name(), AtomixConsistentMapService::new)
+ .put(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name(), AtomixConsistentTreeMapService::new)
+ .put(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name(), AtomixConsistentSetMultimapService::new)
+ .put(DistributedPrimitive.Type.COUNTER_MAP.name(), AtomixAtomicCounterMapService::new)
+ .put(DistributedPrimitive.Type.COUNTER.name(), AtomixCounterService::new)
+ .put(DistributedPrimitive.Type.LEADER_ELECTOR.name(), AtomixLeaderElectorService::new)
+ .put(DistributedPrimitive.Type.WORK_QUEUE.name(), AtomixWorkQueueService::new)
+ .put(DistributedPrimitive.Type.DOCUMENT_TREE.name(), AtomixDocumentTreeService::new)
+ .build();
public StoragePartition(Partition partition,
- MessagingService messagingService,
+ ClusterCommunicationService clusterCommunicator,
ClusterService clusterService,
- Serializer serializer,
- Executor sharedExecutor,
File logFolder) {
this.partition = partition;
- this.messagingService = messagingService;
- this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
this.localNodeId = clusterService.getLocalNode().id();
- this.serializer = serializer;
- this.sharedExecutor = sharedExecutor;
this.logFolder = logFolder;
}
@@ -87,10 +92,14 @@
@Override
public CompletableFuture<Void> open() {
if (partition.getMembers().contains(localNodeId)) {
- openServer();
+ return openServer()
+ .thenCompose(v -> openClient())
+ .thenAccept(v -> isOpened.set(true))
+ .thenApply(v -> null);
}
- return openClient().thenAccept(v -> isOpened.set(true))
- .thenApply(v -> null);
+ return openClient()
+ .thenAccept(v -> isOpened.set(true))
+ .thenApply(v -> null);
}
@Override
@@ -117,11 +126,11 @@
}
/**
- * Returns the {@link Address addresses} of partition members.
- * @return partition member addresses
+ * Returns the {@link MemberId identifiers} of partition members.
+ * @return partition member identifiers
*/
- public Collection<Address> getMemberAddresses() {
- return Collections2.transform(partition.getMembers(), this::toAddress);
+ public Collection<MemberId> getMemberIds() {
+ return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
}
/**
@@ -132,10 +141,12 @@
if (!partition.getMembers().contains(localNodeId) || server != null) {
return CompletableFuture.completedFuture(null);
}
- StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
- this,
- serializer,
- () -> new CopycatTransport(partition.getId(), messagingService),
+ StoragePartitionServer server = new StoragePartitionServer(this,
+ MemberId.from(localNodeId.id()),
+ () -> new RaftServerCommunicator(
+ partition.getId(),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator),
logFolder);
return server.open().thenRun(() -> this.server = server);
}
@@ -149,19 +160,24 @@
.stream()
.filter(nodeId -> !nodeId.equals(localNodeId))
.collect(Collectors.toSet());
- StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
- this,
- serializer,
- () -> new CopycatTransport(partition.getId(), messagingService),
+ StoragePartitionServer server = new StoragePartitionServer(this,
+ MemberId.from(localNodeId.id()),
+ () -> new RaftServerCommunicator(
+ partition.getId(),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator),
logFolder);
- return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
+ return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
+ .thenRun(() -> this.server = server);
}
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
- serializer,
- new CopycatTransport(partition.getId(), messagingService),
- sharedExecutor);
+ MemberId.from(localNodeId.id()),
+ new RaftClientCommunicator(
+ partition.getId(),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator));
return client.open().thenApply(v -> client);
}
@@ -185,11 +201,6 @@
return CompletableFuture.completedFuture(null);
}
- private Address toAddress(NodeId nodeId) {
- ControllerNode node = clusterService.getNode(nodeId);
- return new Address(node.ip().toString(), node.tcpPort());
- }
-
/**
* Returns the partition information if this partition is locally managed i.e.
* this node is a active member of the partition.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d97868c..b55efc4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,30 +15,19 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.Collection;
+import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import java.util.stream.Collectors;
import com.google.common.base.Suppliers;
-import io.atomix.AtomixClient;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.client.ConnectionStrategies;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.client.CopycatClient.State;
-import io.atomix.copycat.client.RecoveryStrategies;
-import io.atomix.copycat.client.ServerSelectionStrategies;
-import io.atomix.manager.ResourceClient;
-import io.atomix.manager.ResourceManagerException;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-import io.atomix.resource.ResourceRegistry;
-import io.atomix.resource.ResourceType;
-import io.atomix.variables.DistributedLong;
+import io.atomix.protocols.raft.RaftClient;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.proxy.CommunicationStrategy;
+import io.atomix.protocols.raft.session.RaftSessionMetadata;
import org.onlab.util.HexString;
-import org.onlab.util.OrderedExecutor;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
@@ -60,7 +49,7 @@
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
@@ -76,55 +65,33 @@
private final Logger log = getLogger(getClass());
private final StoragePartition partition;
- private final Transport transport;
- private final io.atomix.catalyst.serializer.Serializer serializer;
- private final Executor sharedExecutor;
- private AtomixClient client;
- private ResourceClient resourceClient;
+ private final MemberId localMemberId;
+ private final RaftClientProtocol protocol;
+ private RaftClient client;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
Serializer.using(KryoNamespaces.BASIC)));
- Function<State, Status> mapper = state -> {
- switch (state) {
- case CONNECTED:
- return Status.ACTIVE;
- case SUSPENDED:
- return Status.SUSPENDED;
- case CLOSED:
- return Status.INACTIVE;
- default:
- throw new IllegalStateException("Unknown state " + state);
- }
- };
- public StoragePartitionClient(StoragePartition partition,
- io.atomix.catalyst.serializer.Serializer serializer,
- Transport transport,
- Executor sharedExecutor) {
+ public StoragePartitionClient(StoragePartition partition, MemberId localMemberId, RaftClientProtocol protocol) {
this.partition = partition;
- this.serializer = serializer;
- this.transport = transport;
- this.sharedExecutor = sharedExecutor;
+ this.localMemberId = localMemberId;
+ this.protocol = protocol;
}
@Override
public CompletableFuture<Void> open() {
synchronized (StoragePartitionClient.this) {
- resourceClient = newResourceClient(transport,
- serializer.clone(),
- StoragePartition.RESOURCE_TYPES);
- resourceClient.client().onStateChange(state -> log.debug("Partition {} client state"
- + " changed to {}", partition.getId(), state));
- client = new AtomixClient(resourceClient);
+ client = newRaftClient(protocol);
}
- return client.connect(partition.getMemberAddresses()).whenComplete((r, e) -> {
+ return client.connect(partition.getMemberIds()).whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully started client for partition {}", partition.getId());
} else {
log.info("Failed to start client for partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
+
}
@Override
@@ -132,25 +99,19 @@
return client != null ? client.close() : CompletableFuture.completedFuture(null);
}
- /**
- * Returns the executor provided by the given supplier or a serial executor if the supplier is {@code null}.
- *
- * @param executorSupplier the user-provided executor supplier
- * @return the executor
- */
- private Executor defaultExecutor(Supplier<Executor> executorSupplier) {
- return executorSupplier != null ? executorSupplier.get() : new OrderedExecutor(sharedExecutor);
- }
-
@Override
- public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
- Consumer<State> statusListener = state -> {
- atomixConsistentMap.statusChangeListeners()
- .forEach(listener -> listener.accept(mapper.apply(state)));
- };
- resourceClient.client().onStateChange(statusListener);
+ public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+ AtomixConsistentMap atomixConsistentMap =
+ new AtomixConsistentMap(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.ANY)
+ .withTimeout(Duration.ofSeconds(30))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
AsyncConsistentMap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@@ -162,24 +123,27 @@
// We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)),
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
- return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+ return transcodedMap;
}
@Override
- public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
AtomixConsistentTreeMap atomixConsistentTreeMap =
- client.getResource(name, AtomixConsistentTreeMap.class).join();
- Consumer<State> statusListener = state -> {
- atomixConsistentTreeMap.statusChangeListeners()
- .forEach(listener -> listener.accept(mapper.apply(state)));
- };
- resourceClient.client().onStateChange(statusListener);
+ new AtomixConsistentTreeMap(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.ANY)
+ .withTimeout(Duration.ofSeconds(30))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
AsyncConsistentTreeMap<byte[]> rawMap =
new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
@@ -191,24 +155,26 @@
AsyncConsistentTreeMap<V> transcodedMap =
DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
- rawMap,
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
+ rawMap,
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
- return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+ return transcodedMap;
}
@Override
- public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
AtomixConsistentSetMultimap atomixConsistentSetMultimap =
- client.getResource(name, AtomixConsistentSetMultimap.class)
- .join();
- Consumer<State> statusListener = state -> {
- atomixConsistentSetMultimap.statusChangeListeners()
- .forEach(listener -> listener.accept(mapper.apply(state)));
- };
- resourceClient.client().onStateChange(statusListener);
+ new AtomixConsistentSetMultimap(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.ANY)
+ .withTimeout(Duration.ofSeconds(30))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
AsyncConsistentMultimap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMultimap<String, byte[]>(
@@ -227,97 +193,136 @@
value -> serializer.encode(value),
bytes -> serializer.decode(bytes));
- return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+ return transcodedMap;
}
@Override
- public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
+ public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
+ return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
}
@Override
- public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- AtomixAtomicCounterMap atomixAtomicCounterMap =
- client.getResource(name, AtomixAtomicCounterMap.class)
- .join();
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+ AtomixAtomicCounterMap atomixAtomicCounterMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
+ .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
+ .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withTimeout(Duration.ofSeconds(30))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
AsyncAtomicCounterMap<K> transcodedMap =
- DistributedPrimitives.<K, String>newTranscodingAtomicCounterMap(
- atomixAtomicCounterMap,
+ DistributedPrimitives.newTranscodingAtomicCounterMap(
+ atomixAtomicCounterMap,
key -> HexString.toHexString(serializer.encode(key)),
string -> serializer.decode(HexString.fromHexString(string)));
- return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+ return transcodedMap;
}
@Override
- public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
- DistributedLong distributedLong = client.getLong(name).join();
- AsyncAtomicCounter asyncCounter = new AtomixCounter(name, distributedLong);
- return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier), sharedExecutor);
+ public AsyncAtomicCounter newAsyncCounter(String name) {
+ return new AtomixCounter(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.COUNTER.name())
+ .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
+ .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withTimeout(Duration.ofSeconds(30))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
}
@Override
- public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
- DistributedLong distributedLong = client.getLong(name).join();
- AsyncAtomicIdGenerator asyncIdGenerator = new AtomixIdGenerator(name, distributedLong);
- return new ExecutingAsyncAtomicIdGenerator(asyncIdGenerator, defaultExecutor(executorSupplier), sharedExecutor);
+ public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
+ return new AtomixIdGenerator(newAsyncCounter(name));
}
@Override
- public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
- return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier), sharedExecutor);
+ public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
+ return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
}
@Override
- public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- AtomixWorkQueue atomixWorkQueue = client.getResource(name, AtomixWorkQueue.class).join();
- WorkQueue<E> workQueue = new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
- return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier), sharedExecutor);
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ AtomixWorkQueue atomixWorkQueue = new AtomixWorkQueue(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.WORK_QUEUE.name())
+ .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
+ .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withTimeout(Duration.ofSeconds(5))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
+ return new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
}
@Override
- public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
- String name, Serializer serializer, Supplier<Executor> executorSupplier) {
- AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
- AsyncDocumentTree<V> asyncDocumentTree = new DefaultDistributedDocumentTree<>(
- name, atomixDocumentTree, serializer);
- return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier), sharedExecutor);
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+ AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.DOCUMENT_TREE.name())
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.ANY)
+ .withTimeout(Duration.ofSeconds(30))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
+ return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
}
@Override
- public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
- AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
- .thenCompose(AtomixLeaderElector::setupCache)
- .join();
- Consumer<State> statusListener = state -> leaderElector.statusChangeListeners()
- .forEach(listener -> listener.accept(mapper.apply(state)));
- resourceClient.client().onStateChange(statusListener);
- return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier), sharedExecutor);
+ public AsyncLeaderElector newAsyncLeaderElector(String name) {
+ AtomixLeaderElector leaderElector = new AtomixLeaderElector(client.newProxyBuilder()
+ .withName(name)
+ .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
+ .withReadConsistency(ReadConsistency.LINEARIZABLE)
+ .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withTimeout(Duration.ofSeconds(5))
+ .withMaxRetries(5)
+ .build()
+ .open()
+ .join());
+ leaderElector.setupCache().join();
+ return leaderElector;
}
@Override
public Set<String> getAsyncConsistentMapNames() {
- return client.keys(AtomixConsistentMap.class).join();
+ return client.metadata().getSessions(DistributedPrimitive.Type.CONSISTENT_MAP.name())
+ .join()
+ .stream()
+ .map(RaftSessionMetadata::serviceName)
+ .collect(Collectors.toSet());
}
@Override
public Set<String> getAsyncAtomicCounterNames() {
- return client.keys(DistributedLong.class).join();
+ return client.metadata().getSessions(DistributedPrimitive.Type.COUNTER.name())
+ .join()
+ .stream()
+ .map(RaftSessionMetadata::serviceName)
+ .collect(Collectors.toSet());
}
@Override
public Set<String> getWorkQueueNames() {
- return client.keys(AtomixWorkQueue.class).join();
+ return client.metadata().getSessions(DistributedPrimitive.Type.WORK_QUEUE.name())
+ .join()
+ .stream()
+ .map(RaftSessionMetadata::serviceName)
+ .collect(Collectors.toSet());
}
@Override
public boolean isOpen() {
- return resourceClient.client().state() != State.CLOSED;
+ return client != null;
}
/**
@@ -325,35 +330,14 @@
* @return partition client information
*/
public PartitionClientInfo clientInfo() {
- return new PartitionClientInfo(partition.getId(),
- partition.getMembers(),
- resourceClient.client().session().id(),
- mapper.apply(resourceClient.client().state()));
+ return new PartitionClientInfo(partition.getId(), partition.getMembers());
}
- private ResourceClient newResourceClient(Transport transport,
- io.atomix.catalyst.serializer.Serializer serializer,
- Collection<ResourceType> resourceTypes) {
- ResourceRegistry registry = new ResourceRegistry();
- resourceTypes.forEach(registry::register);
- CopycatClient copycatClient = CopycatClient.builder()
- .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
- .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
- .withRecoveryStrategy(RecoveryStrategies.RECOVER)
- .withTransport(transport)
- .withSerializer(serializer)
+ private RaftClient newRaftClient(RaftClientProtocol protocol) {
+ return RaftClient.newBuilder()
+ .withClientId("partition-" + partition.getId())
+ .withMemberId(MemberId.from(localMemberId.id()))
+ .withProtocol(protocol)
.build();
- copycatClient.serializer().resolve(new ResourceManagerTypeResolver());
- for (ResourceType type : registry.types()) {
- try {
- type.factory()
- .newInstance()
- .createSerializableTypeResolver()
- .resolve(copycatClient.serializer().registry());
- } catch (InstantiationException | IllegalAccessException e) {
- throw new ResourceManagerException(e);
- }
- }
- return new ResourceClient(new OnosCopycatClient(copycatClient, 5, 100));
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 9305a31..9ae34d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -15,13 +15,12 @@
*/
package org.onosproject.store.primitives.impl;
-import io.atomix.copycat.server.cluster.Member;
-
import java.util.Collection;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import io.atomix.protocols.raft.cluster.RaftMember;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.PartitionInfo;
@@ -34,15 +33,15 @@
public class StoragePartitionDetails {
private final PartitionId partitionId;
- private final Set<Member> activeMembers;
- private final Set<Member> configuredMembers;
- private final Member leader;
+ private final Set<RaftMember> activeMembers;
+ private final Set<RaftMember> configuredMembers;
+ private final RaftMember leader;
private final long leaderTerm;
public StoragePartitionDetails(PartitionId partitionId,
- Collection<Member> activeMembers,
- Collection<Member> configuredMembers,
- Member leader,
+ Collection<RaftMember> activeMembers,
+ Collection<RaftMember> configuredMembers,
+ RaftMember leader,
long leaderTerm) {
this.partitionId = partitionId;
this.activeMembers = ImmutableSet.copyOf(activeMembers);
@@ -55,7 +54,7 @@
* Returns the set of active members.
* @return active members
*/
- public Set<Member> activeMembers() {
+ public Set<RaftMember> activeMembers() {
return activeMembers;
}
@@ -63,7 +62,7 @@
* Returns the set of configured members.
* @return configured members
*/
- public Set<Member> configuredMembers() {
+ public Set<RaftMember> configuredMembers() {
return configuredMembers;
}
@@ -71,7 +70,7 @@
* Returns the partition leader.
* @return leader
*/
- public Member leader() {
+ public RaftMember leader() {
return leader;
}
@@ -98,8 +97,8 @@
* @return partition info
*/
public PartitionInfo toPartitionInfo() {
- Function<Member, String> memberToString =
- m -> m == null ? "none" : String.format("%s:%d", m.address().host(), m.address().port());
+ Function<RaftMember, String> memberToString =
+ m -> m == null ? "none" : m.memberId().toString();
return new PartitionInfo(partitionId.toString(),
leaderTerm,
activeMembers.stream().map(memberToString).collect(Collectors.toList()),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index f0c0609..c1ea37c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -15,24 +15,23 @@
*/
package org.onosproject.store.primitives.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.server.CopycatServer;
-import io.atomix.copycat.server.storage.Storage;
-import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.internal.ResourceManagerState;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-
import java.io.File;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import io.atomix.protocols.raft.RaftServer;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.storage.StorageLevel;
+import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* {@link StoragePartition} server.
*/
@@ -41,36 +40,34 @@
private final Logger log = getLogger(getClass());
private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768;
+ private final MemberId localMemberId;
private final StoragePartition partition;
- private final Address localAddress;
- private final Supplier<Transport> transport;
- private final Serializer serializer;
+ private final Supplier<RaftServerProtocol> protocol;
private final File dataFolder;
- private CopycatServer server;
+ private RaftServer server;
- public StoragePartitionServer(Address localAddress,
+ public StoragePartitionServer(
StoragePartition partition,
- Serializer serializer,
- Supplier<Transport> transport,
+ MemberId localMemberId,
+ Supplier<RaftServerProtocol> protocol,
File dataFolder) {
this.partition = partition;
- this.localAddress = localAddress;
- this.serializer = serializer;
- this.transport = transport;
+ this.localMemberId = localMemberId;
+ this.protocol = protocol;
this.dataFolder = dataFolder;
}
@Override
public CompletableFuture<Void> open() {
- CompletableFuture<CopycatServer> serverOpenFuture;
- if (partition.getMemberAddresses().contains(localAddress)) {
+ CompletableFuture<RaftServer> serverOpenFuture;
+ if (partition.getMemberIds().contains(localMemberId)) {
if (server != null && server.isRunning()) {
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
server = buildServer();
}
- serverOpenFuture = server.bootstrap(partition.getMemberAddresses());
+ serverOpenFuture = server.bootstrap(partition.getMemberIds());
} else {
serverOpenFuture = CompletableFuture.completedFuture(null);
}
@@ -96,24 +93,21 @@
return server.leave();
}
- private CopycatServer buildServer() {
- CopycatServer server = CopycatServer.builder(localAddress)
+ private RaftServer buildServer() {
+ RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
.withName("partition-" + partition.getId())
- .withSerializer(serializer.clone())
- .withTransport(transport.get())
- .withStateMachine(ResourceManagerState::new)
- .withStorage(Storage.builder()
+ .withProtocol(protocol.get())
+ .withStorage(RaftStorage.newBuilder()
.withStorageLevel(StorageLevel.DISK)
- .withCompactionThreads(1)
+ .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(dataFolder)
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
- .build())
- .build();
- server.serializer().resolve(new ResourceManagerTypeResolver());
- return server;
+ .build());
+ StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+ return builder.build();
}
- public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+ public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
@@ -135,9 +129,9 @@
*/
public PartitionInfo info() {
return new StoragePartitionDetails(partition.getId(),
- server.cluster().members(),
- server.cluster().members(),
- server.cluster().leader(),
- server.cluster().term()).toPartitionInfo();
+ server.cluster().getMembers(),
+ server.cluster().getMembers(),
+ server.cluster().getLeader(),
+ server.cluster().getTerm()).toPartitionInfo();
}
}