Added custom transport implementaion (for Catalyst Transport) for all copycat specific communication
Change-Id: I801d973b7c3412f6a8efcec77fe73fc480b2ce6e
diff --git a/core/store/pom.xml b/core/store/pom.xml
index 9b3067c..518acd7 100644
--- a/core/store/pom.xml
+++ b/core/store/pom.xml
@@ -32,9 +32,10 @@
<description>ONOS Core Store subsystem</description>
<modules>
+ <module>primitives</module>
<module>dist</module>
<module>persistence</module>
- <module>serializers</module>
+ <module>serializers</module>
</modules>
<dependencies>
diff --git a/core/store/primitives/pom.xml b/core/store/primitives/pom.xml
new file mode 100644
index 0000000..a781198
--- /dev/null
+++ b/core/store/primitives/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2016 Open Networking Laboratory
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-store</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>onos-core-primitives</artifactId>
+ <packaging>bundle</packaging>
+
+ <description>ONOS distributed state management primitives</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ </dependency>
+
+ <!-- for shaded atomix/copycat -->
+ <dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-thirdparty</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
new file mode 100644
index 0000000..4c69605
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.onosproject.store.cluster.messaging.MessagingService;
+
+import io.atomix.catalyst.transport.Client;
+import io.atomix.catalyst.transport.Server;
+import io.atomix.catalyst.transport.Transport;
+
+/**
+ * Custom {@link Transport transport} for Copycat interactions
+ * built on top of {@link MessagingService}.
+ *
+ * @see CopycatTransportServer
+ * @see CopycatTransportClient
+ */
+public class CopycatTransport implements Transport {
+
+ /**
+ * Transport Mode.
+ */
+ public enum Mode {
+ /**
+ * Signifies transport for client -> server interaction.
+ */
+ CLIENT,
+
+ /**
+ * Signified transport for server -> server interaction.
+ */
+ SERVER
+ }
+
+ private final Mode mode;
+ private final String clusterName;
+ private final MessagingService messagingService;
+
+ public CopycatTransport(Mode mode, String clusterName, MessagingService messagingService) {
+ this.mode = checkNotNull(mode);
+ this.clusterName = checkNotNull(clusterName);
+ this.messagingService = checkNotNull(messagingService);
+ }
+
+ @Override
+ public Client client() {
+ return new CopycatTransportClient(clusterName,
+ messagingService,
+ mode);
+ }
+
+ @Override
+ public Server server() {
+ return new CopycatTransportServer(clusterName,
+ messagingService);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
new file mode 100644
index 0000000..607e25d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang.math.RandomUtils;
+import org.onosproject.store.cluster.messaging.MessagingService;
+
+import com.google.common.collect.Sets;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Client;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.util.concurrent.ThreadContext;
+
+/**
+ * {@link Client} implementation for {@link CopycatTransport}.
+ */
+public class CopycatTransportClient implements Client {
+
+ private final String clusterName;
+ private final MessagingService messagingService;
+ private final CopycatTransport.Mode mode;
+ private final ThreadContext context;
+ private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
+
+ CopycatTransportClient(String clusterName, MessagingService messagingService, CopycatTransport.Mode mode) {
+ this.clusterName = checkNotNull(clusterName);
+ this.messagingService = checkNotNull(messagingService);
+ this.mode = checkNotNull(mode);
+ this.context = ThreadContext.currentContextOrThrow();
+ }
+
+ @Override
+ public CompletableFuture<Connection> connect(Address remoteAddress) {
+ CopycatTransportConnection connection = new CopycatTransportConnection(
+ nextConnectionId(),
+ CopycatTransport.Mode.CLIENT,
+ clusterName,
+ remoteAddress,
+ messagingService,
+ context);
+ if (mode == CopycatTransport.Mode.CLIENT) {
+ connection.setBidirectional();
+ }
+ connections.add(connection);
+ return CompletableFuture.supplyAsync(() -> connection, context.executor());
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
+ }
+
+ private long nextConnectionId() {
+ return RandomUtils.nextLong();
+ }
+ }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
new file mode 100644
index 0000000..7b5d42a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.Tools;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingService;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.MessageHandler;
+import io.atomix.catalyst.transport.TransportException;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.catalyst.util.Listener;
+import io.atomix.catalyst.util.Listeners;
+import io.atomix.catalyst.util.ReferenceCounted;
+import io.atomix.catalyst.util.concurrent.ThreadContext;
+
+/**
+ * {@link Connection} implementation for CopycatTransport.
+ */
+public class CopycatTransportConnection implements Connection {
+
+ private final Listeners<Throwable> exceptionListeners = new Listeners<>();
+ private final Listeners<Connection> closeListeners = new Listeners<>();
+
+ static final byte SUCCESS = 0x03;
+ static final byte FAILURE = 0x04;
+
+ private final long connectionId;
+ private CopycatTransport.Mode mode;
+ private final Address remoteAddress;
+ private final MessagingService messagingService;
+ private final String outboundMessageSubject;
+ private final String inboundMessageSubject;
+ private final ThreadContext context;
+ private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
+ private final AtomicInteger messagesSent = new AtomicInteger(0);
+ private final AtomicInteger sendFailures = new AtomicInteger(0);
+ private final AtomicInteger messagesReceived = new AtomicInteger(0);
+ private final AtomicInteger receiveFailures = new AtomicInteger(0);
+
+ CopycatTransportConnection(long connectionId,
+ CopycatTransport.Mode mode,
+ String clusterName,
+ Address address,
+ MessagingService messagingService,
+ ThreadContext context) {
+ this.connectionId = connectionId;
+ this.mode = checkNotNull(mode);
+ this.remoteAddress = checkNotNull(address);
+ this.messagingService = checkNotNull(messagingService);
+ if (mode == CopycatTransport.Mode.CLIENT) {
+ this.outboundMessageSubject = String.format("onos-copycat-%s", clusterName);
+ this.inboundMessageSubject = String.format("onos-copycat-%s-%d", clusterName, connectionId);
+ } else {
+ this.outboundMessageSubject = String.format("onos-copycat-%s-%d", clusterName, connectionId);
+ this.inboundMessageSubject = String.format("onos-copycat-%s", clusterName);
+ }
+ this.context = checkNotNull(context);
+ }
+
+ public void setBidirectional() {
+ messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+ if (input.readLong() != connectionId) {
+ throw new IllegalStateException("Invalid connection Id");
+ }
+ return handle(IOUtils.toByteArray(input));
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public <T, U> CompletableFuture<U> send(T message) {
+ ThreadContext context = ThreadContext.currentContextOrThrow();
+ CompletableFuture<U> result = new CompletableFuture<>();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new DataOutputStream(baos).writeLong(connectionId);
+ context.serializer().writeObject(message, baos);
+ if (message instanceof ReferenceCounted) {
+ ((ReferenceCounted<?>) message).release();
+ }
+ messagingService.sendAndReceive(toEndpoint(remoteAddress),
+ outboundMessageSubject,
+ baos.toByteArray(),
+ context.executor())
+ .whenComplete((r, e) -> {
+ if (e == null) {
+ messagesSent.incrementAndGet();
+ } else {
+ sendFailures.incrementAndGet();
+ }
+ handleResponse(r, e, result, context);
+ });
+ } catch (Exception e) {
+ result.completeExceptionally(new TransportException("Failed to send request", e));
+ }
+ return result;
+ }
+
+ private <T> void handleResponse(byte[] response,
+ Throwable error,
+ CompletableFuture<T> future,
+ ThreadContext context) {
+ if (error != null) {
+ context.execute(() -> future.completeExceptionally(error));
+ return;
+ }
+ checkNotNull(response);
+ InputStream input = new ByteArrayInputStream(response);
+ try {
+ byte status = (byte) input.read();
+ if (status == FAILURE) {
+ Throwable t = context.serializer().readObject(input);
+ context.execute(() -> future.completeExceptionally(t));
+ } else {
+ context.execute(() -> future.complete(context.serializer().readObject(input)));
+ }
+ } catch (IOException e) {
+ context.execute(() -> future.completeExceptionally(e));
+ }
+ }
+
+ @Override
+ public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
+ Assert.notNull(type, "type");
+ handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
+ return null;
+ }
+
+ public CompletableFuture<byte[]> handle(byte[] message) {
+ try {
+ Object request = context.serializer().readObject(new ByteArrayInputStream(message));
+ InternalHandler handler = handlers.get(request.getClass());
+ if (handler == null) {
+ return Tools.exceptionalFuture(new IllegalStateException(
+ "No handler registered for " + request.getClass()));
+ }
+ return handler.handle(request).handle((result, error) -> {
+ if (error == null) {
+ messagesReceived.incrementAndGet();
+ } else {
+ receiveFailures.incrementAndGet();
+ }
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ baos.write(error != null ? FAILURE : SUCCESS);
+ context.serializer().writeObject(error != null ? error : result, baos);
+ return baos.toByteArray();
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
+ return exceptionListeners.add(listener);
+ }
+
+ @Override
+ public Listener<Connection> closeListener(Consumer<Connection> listener) {
+ return closeListeners.add(listener);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ // TODO: need to unregister message handler
+ closeListeners.forEach(listener -> listener.accept(this));
+ if (mode == CopycatTransport.Mode.CLIENT) {
+ messagingService.unregisterHandler(inboundMessageSubject);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(connectionId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof CopycatTransportConnection)) {
+ return false;
+ }
+
+ return connectionId == ((CopycatTransportConnection) other).connectionId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("id", connectionId)
+ .add("sent", messagesSent.get())
+ .add("received", messagesReceived.get())
+ .add("sendFailures", sendFailures.get())
+ .add("receiveFailures", receiveFailures.get())
+ .toString();
+ }
+
+ private Endpoint toEndpoint(Address address) {
+ try {
+ return new Endpoint(IpAddress.valueOf(InetAddress.getByName(address.host())), address.port());
+ } catch (UnknownHostException e) {
+ Throwables.propagate(e);
+ return null;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private final class InternalHandler {
+
+ private final MessageHandler handler;
+ private final ThreadContext context;
+
+ private InternalHandler(MessageHandler handler, ThreadContext context) {
+ this.handler = handler;
+ this.context = context;
+ }
+
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Object> handle(Object message) {
+ CompletableFuture<Object> answer = new CompletableFuture<>();
+ context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
+ if (e != null) {
+ answer.completeExceptionally((Throwable) e);
+ } else {
+ answer.complete(r);
+ }
+ }));
+ return answer;
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
new file mode 100644
index 0000000..43c2cfc
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.onlab.util.Tools;
+import org.onosproject.store.cluster.messaging.MessagingService;
+
+import com.google.common.collect.Maps;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.Server;
+import io.atomix.catalyst.util.concurrent.SingleThreadContext;
+import io.atomix.catalyst.util.concurrent.ThreadContext;
+
+/**
+ * {@link Server} implementation for {@link CopycatTransport}.
+ */
+public class CopycatTransportServer implements Server {
+
+ private final AtomicBoolean listening = new AtomicBoolean(false);
+ private CompletableFuture<Void> listenFuture;
+ private final String clusterName;
+ private final MessagingService messagingService;
+ private final String messageSubject;
+ private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
+
+ CopycatTransportServer(String clusterName, MessagingService messagingService) {
+ this.clusterName = checkNotNull(clusterName);
+ this.messagingService = checkNotNull(messagingService);
+ this.messageSubject = String.format("onos-copycat-%s", clusterName);
+ }
+
+ @Override
+ public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
+ if (listening.get()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ ThreadContext context = ThreadContext.currentContextOrThrow();
+ synchronized (this) {
+ if (listenFuture == null) {
+ listenFuture = new CompletableFuture<>();
+ listen(address, listener, context);
+ }
+ }
+ return listenFuture;
+ }
+
+ public void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
+ messagingService.registerHandler(messageSubject, (sender, payload) -> {
+ try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+ long connectionId = input.readLong();
+ InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
+ int senderPort = sender.port();
+ Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
+ AtomicBoolean newConnection = new AtomicBoolean(false);
+ CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
+ newConnection.set(true);
+ return new CopycatTransportConnection(connectionId,
+ CopycatTransport.Mode.SERVER,
+ clusterName,
+ senderAddress,
+ messagingService,
+ getOrCreateContext(context));
+ });
+ byte[] request = IOUtils.toByteArray(input);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ if (newConnection.get()) {
+ listener.accept(connection);
+ }
+ return connection;
+ }, context.executor()).thenCompose(c -> c.handle(request));
+ } catch (IOException e) {
+ return Tools.exceptionalFuture(e);
+ }
+ });
+ listening.set(true);
+ context.execute(() -> {
+ listenFuture.complete(null);
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ messagingService.unregisterHandler(messageSubject);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Returns the current execution context or creates one.
+ */
+ private ThreadContext getOrCreateContext(ThreadContext parentContext) {
+ ThreadContext context = ThreadContext.currentContext();
+ if (context != null) {
+ return context;
+ }
+ return new SingleThreadContext("copycat-transport-server-" + clusterName, parentContext.serializer().clone());
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java
new file mode 100644
index 0000000..0176cb5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation for distributed state management primitives.
+ */
+package org.onosproject.store.primitives.impl;