Copycat messaging new happens over the same cluster messaging used for all other ONOS p2p communication
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
index 449fd71..11cc5ff 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
@@ -26,7 +26,7 @@
/**
* Performs a write operation on the database.
- * @param request
+ * @param request write request
* @return write result.
* @throws DatabaseException if there is failure in execution write.
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
index 0db5dfe..33b57d2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -17,15 +17,15 @@
}
/**
- * Database table name.
- * @return
+ * Returns database table name.
+ * @return table name.
*/
public String tableName() {
return tableName;
}
/**
- * Database table key.
+ * Returns database table key.
* @return key.
*/
public String key() {
@@ -33,7 +33,7 @@
}
/**
- * value associated with the key.
+ * Returns value associated with the key.
* @return non-null value if the table contains one, null otherwise.
*/
public VersionedValue value() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
index ee1d0f0..d88d35e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/VersionedValue.java
@@ -12,8 +12,8 @@
/**
* Creates a new instance with the specified value and version.
- * @param value
- * @param version
+ * @param value value
+ * @param version version
*/
public VersionedValue(byte[] value, long version) {
this.value = value;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
new file mode 100644
index 0000000..2e7fe11
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -0,0 +1,196 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Vector;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.internal.log.ConfigurationEntry;
+import net.kuujo.copycat.internal.log.CopycatEntry;
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.internal.log.SnapshotEntry;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.Protocol;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ImmutableListSerializer;
+import org.onlab.onos.store.serializers.ImmutableMapSerializer;
+import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+import org.slf4j.Logger;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+@Component(immediate = true)
+@Service
+public class ClusterMessagingProtocol implements Protocol<TcpMember> {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterCommunicationService clusterCommunicator;
+
+ public static final MessageSubject COPYCAT_PING =
+ new MessageSubject("copycat-raft-consensus-ping");
+ public static final MessageSubject COPYCAT_SYNC =
+ new MessageSubject("copycat-raft-consensus-sync");
+ public static final MessageSubject COPYCAT_POLL =
+ new MessageSubject("copycat-raft-consensus-poll");
+ public static final MessageSubject COPYCAT_SUBMIT =
+ new MessageSubject("copycat-raft-consensus-submit");
+
+ private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .register(PingRequest.class)
+ .register(PingResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(SyncRequest.class)
+ .register(SyncResponse.class)
+ .register(SubmitRequest.class)
+ .register(SubmitResponse.class)
+ .register(Status.class)
+ .register(ConfigurationEntry.class)
+ .register(SnapshotEntry.class)
+ .register(CopycatEntry.class)
+ .register(OperationEntry.class)
+ .register(TcpClusterConfig.class)
+ .register(TcpMember.class)
+ .build();
+
+ private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
+ .register(ReadRequest.class)
+ .register(WriteRequest.class)
+ .register(InternalReadResult.class)
+ .register(InternalWriteResult.class)
+ .register(InternalReadResult.Status.class)
+ .register(WriteResult.class)
+ .register(ReadResult.class)
+ .register(InternalWriteResult.Status.class)
+ .register(VersionedValue.class)
+ .build();
+
+ public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+ .register(Arrays.asList().getClass(), new CollectionSerializer() {
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
+ return new ArrayList();
+ }
+ })
+ .register(ImmutableMap.class, new ImmutableMapSerializer())
+ .register(ImmutableList.class, new ImmutableListSerializer())
+ .register(ImmutableSet.class, new ImmutableSetSerializer())
+ .register(
+ Vector.class,
+ ArrayList.class,
+ Arrays.asList().getClass(),
+ HashMap.class,
+ HashSet.class,
+ LinkedList.class,
+ byte[].class)
+ .build();
+
+ public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(COPYCAT)
+ .register(COMMON)
+ .register(DATABASE)
+ .build()
+ .populate(1);
+ }
+ };
+
+ @Activate
+ public void activate() {
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped.");
+ }
+
+ @Override
+ public ProtocolServer createServer(TcpMember member) {
+ return new ClusterMessagingProtocolServer(clusterCommunicator);
+ }
+
+ @Override
+ public ProtocolClient createClient(TcpMember member) {
+ ControllerNode node = getControllerNode(member.host(), member.port());
+ checkNotNull(node, "A valid controller node is expected");
+ return new ClusterMessagingProtocolClient(
+ clusterCommunicator, node);
+ }
+
+ private ControllerNode getControllerNode(String host, int port) {
+ for (ControllerNode node : clusterService.getNodes()) {
+ if (node.ip().toString().equals(host) && node.tcpPort() == port) {
+ return node;
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
new file mode 100644
index 0000000..f638444
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -0,0 +1,165 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.slf4j.Logger;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ClusterMessagingProtocolClient implements ProtocolClient {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final ThreadFactory THREAD_FACTORY =
+ new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
+
+ public static final long RETRY_INTERVAL_MILLIS = 2000;
+
+ private final ClusterCommunicationService clusterCommunicator;
+ private final ControllerNode remoteNode;
+
+ // FIXME: Thread pool sizing.
+ private static final ScheduledExecutorService THREAD_POOL =
+ new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+
+ public ClusterMessagingProtocolClient(
+ ClusterCommunicationService clusterCommunicator,
+ ControllerNode remoteNode) {
+ this.clusterCommunicator = clusterCommunicator;
+ this.remoteNode = remoteNode;
+ }
+
+ @Override
+ public CompletableFuture<PingResponse> ping(PingRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<SyncResponse> sync(SyncRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<PollResponse> poll(PollRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<Void> connect() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ public <I> MessageSubject messageType(I input) {
+ Class<?> clazz = input.getClass();
+ if (clazz.equals(PollRequest.class)) {
+ return ClusterMessagingProtocol.COPYCAT_POLL;
+ } else if (clazz.equals(SyncRequest.class)) {
+ return ClusterMessagingProtocol.COPYCAT_SYNC;
+ } else if (clazz.equals(SubmitRequest.class)) {
+ return ClusterMessagingProtocol.COPYCAT_SUBMIT;
+ } else if (clazz.equals(PingRequest.class)) {
+ return ClusterMessagingProtocol.COPYCAT_PING;
+ } else {
+ throw new IllegalArgumentException("Unknown class " + clazz.getName());
+ }
+
+ }
+
+ private <I, O> CompletableFuture<O> requestReply(I request) {
+ CompletableFuture<O> future = new CompletableFuture<>();
+ THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+ return future;
+ }
+
+ private class RPCTask<I, O> implements Runnable {
+
+ private final ClusterMessage message;
+ private final CompletableFuture<O> future;
+
+ public RPCTask(I request, CompletableFuture<O> future) {
+ this.message =
+ new ClusterMessage(
+ null,
+ messageType(request),
+ ClusterMessagingProtocol.SERIALIZER.encode(request));
+ this.future = future;
+ }
+
+ @Override
+ public void run() {
+ try {
+ byte[] response = clusterCommunicator
+ .sendAndReceive(message, remoteNode.id())
+ .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
+
+ } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+ if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
+ message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
+ log.warn("Request to {} failed. Will retry "
+ + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
+ THREAD_POOL.schedule(
+ this,
+ RETRY_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(e);
+ }
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
new file mode 100644
index 0000000..0449b8a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -0,0 +1,110 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.concurrent.CompletableFuture;
+
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.RequestHandler;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.slf4j.Logger;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+public class ClusterMessagingProtocolServer implements ProtocolServer {
+
+ private final Logger log = getLogger(getClass());
+ private RequestHandler handler;
+
+ public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
+
+ clusterCommunicator.addSubscriber(
+ ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
+ clusterCommunicator.addSubscriber(
+ ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
+ clusterCommunicator.addSubscriber(
+ ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
+ clusterCommunicator.addSubscriber(
+ ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
+ }
+
+ @Override
+ public void requestHandler(RequestHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public CompletableFuture<Void> listen() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private class CopycatMessageHandler<T> implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ if (request.getClass().equals(PingRequest.class)) {
+ handler.ping((PingRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to ping request", e);
+ }
+ });
+ } else if (request.getClass().equals(PollRequest.class)) {
+ handler.poll((PollRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to poll request", e);
+ }
+ });
+ } else if (request.getClass().equals(SyncRequest.class)) {
+ handler.sync((SyncRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to sync request", e);
+ }
+ });
+ } else if (request.getClass().equals(SubmitRequest.class)) {
+ handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to submit request", e);
+ }
+ });
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 3c92800..d07d1d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -11,37 +11,22 @@
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
-import org.apache.commons.lang3.RandomUtils;
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.NettyMessagingService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.WriteRequest;
public class DatabaseClient {
- private final Endpoint copycatEp;
- ProtocolClient client;
- NettyMessagingService messagingService;
+ private final ProtocolClient client;
- public DatabaseClient(Endpoint copycatEp) {
- this.copycatEp = copycatEp;
+ public DatabaseClient(ProtocolClient client) {
+ this.client = client;
}
private static String nextId() {
return UUID.randomUUID().toString();
}
- public void activate() throws Exception {
- messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
- messagingService.activate();
- client = new NettyProtocolClient(copycatEp, messagingService);
- }
-
- public void deactivate() throws Exception {
- messagingService.deactivate();
- }
-
public boolean createTable(String tableName) {
SubmitRequest request =
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 8042cc6..44d3041 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -18,7 +18,6 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onlab.netty.Endpoint;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
@@ -50,6 +49,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterMessagingProtocol copycatMessagingProtocol;
+
public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
private Copycat copycat;
@@ -57,15 +59,14 @@
@Activate
public void activate() {
- // FIXME hack tcpPort +1 for copycat communication
TcpMember localMember =
new TcpMember(
clusterService.getLocalNode().ip().toString(),
- clusterService.getLocalNode().tcpPort() + 1);
+ clusterService.getLocalNode().tcpPort());
List<TcpMember> remoteMembers = Lists.newArrayList();
for (ControllerNode node : clusterService.getNodes()) {
- TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort() + 1);
+ TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
if (!member.equals(localMember)) {
remoteMembers.add(member);
}
@@ -84,10 +85,10 @@
ControllerNode thisNode = clusterService.getLocalNode();
Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
- copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
+ copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
- client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
+ client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
log.info("Started.");
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 663e9e4..c974486 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -27,7 +27,7 @@
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
- .register(NettyProtocol.COMMON)
+ .register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
deleted file mode 100644
index 9a2259a..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package org.onlab.onos.store.service.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Vector;
-
-import net.kuujo.copycat.cluster.TcpClusterConfig;
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.internal.log.ConfigurationEntry;
-import net.kuujo.copycat.internal.log.CopycatEntry;
-import net.kuujo.copycat.internal.log.OperationEntry;
-import net.kuujo.copycat.internal.log.SnapshotEntry;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.Response.Status;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.Protocol;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.onlab.onos.store.serializers.ImmutableListSerializer;
-import org.onlab.onos.store.serializers.ImmutableMapSerializer;
-import org.onlab.onos.store.serializers.ImmutableSetSerializer;
-import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.VersionedValue;
-import org.onlab.onos.store.service.WriteRequest;
-import org.onlab.onos.store.service.WriteResult;
-import org.onlab.util.KryoNamespace;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
- */
-public class NettyProtocol implements Protocol<TcpMember> {
-
- public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
- public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
- public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
- public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
-
- // TODO: make this configurable.
- public static final long RETRY_INTERVAL_MILLIS = 2000;
-
- private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
- .register(PingRequest.class)
- .register(PingResponse.class)
- .register(PollRequest.class)
- .register(PollResponse.class)
- .register(SyncRequest.class)
- .register(SyncResponse.class)
- .register(SubmitRequest.class)
- .register(SubmitResponse.class)
- .register(Status.class)
- .register(ConfigurationEntry.class)
- .register(SnapshotEntry.class)
- .register(CopycatEntry.class)
- .register(OperationEntry.class)
- .register(TcpClusterConfig.class)
- .register(TcpMember.class)
- .build();
-
- // TODO: Move to the right place.
- private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
- .register(ReadRequest.class)
- .register(WriteRequest.class)
- .register(InternalReadResult.class)
- .register(InternalWriteResult.class)
- .register(InternalReadResult.Status.class)
- .register(WriteResult.class)
- .register(ReadResult.class)
- .register(InternalWriteResult.Status.class)
- .register(VersionedValue.class)
- .build();
-
- public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
- .register(Arrays.asList().getClass(), new CollectionSerializer() {
- @Override
- @SuppressWarnings("rawtypes")
- protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
- return new ArrayList();
- }
- })
- .register(ImmutableMap.class, new ImmutableMapSerializer())
- .register(ImmutableList.class, new ImmutableListSerializer())
- .register(ImmutableSet.class, new ImmutableSetSerializer())
- .register(
- Vector.class,
- ArrayList.class,
- Arrays.asList().getClass(),
- HashMap.class,
- HashSet.class,
- LinkedList.class,
- byte[].class)
- .build();
-
- public static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(COPYCAT)
- .register(COMMON)
- .register(CRAFT)
- .build()
- .populate(1);
- }
- };
-
- private NettyProtocolServer server = null;
-
- // FIXME: This is a total hack.Assumes
- // ProtocolServer is initialized before ProtocolClient
- protected NettyProtocolServer getServer() {
- if (server == null) {
- throw new IllegalStateException("ProtocolServer is not initialized yet!");
- }
- return server;
- }
-
- @Override
- public ProtocolServer createServer(TcpMember member) {
- server = new NettyProtocolServer(member);
- return server;
- }
-
- @Override
- public ProtocolClient createClient(TcpMember member) {
- return new NettyProtocolClient(this, member);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
deleted file mode 100644
index a791990..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package org.onlab.onos.store.service.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PingResponse;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.PollResponse;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SubmitResponse;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.protocol.SyncResponse;
-import net.kuujo.copycat.spi.protocol.ProtocolClient;
-
-import org.onlab.netty.Endpoint;
-import org.onlab.netty.NettyMessagingService;
-import org.slf4j.Logger;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * {@link NettyMessagingService} based Copycat protocol client.
- */
-public class NettyProtocolClient implements ProtocolClient {
-
- private final Logger log = getLogger(getClass());
- private static final ThreadFactory THREAD_FACTORY =
- new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
-
- // Remote endpoint, this client instance is used
- // for communicating with.
- private final Endpoint remoteEp;
- private final NettyMessagingService messagingService;
-
- // TODO: Is 10 the right number of threads?
- private static final ScheduledExecutorService THREAD_POOL =
- new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
-
- public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
- this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
- }
-
- public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
- this.remoteEp = remoteEp;
- this.messagingService = messagingService;
- }
-
- @Override
- public CompletableFuture<PingResponse> ping(PingRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<SyncResponse> sync(SyncRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<PollResponse> poll(PollRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
- return requestReply(request);
- }
-
- @Override
- public CompletableFuture<Void> connect() {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public CompletableFuture<Void> close() {
- return CompletableFuture.completedFuture(null);
- }
-
- public <I> String messageType(I input) {
- Class<?> clazz = input.getClass();
- if (clazz.equals(PollRequest.class)) {
- return NettyProtocol.COPYCAT_POLL;
- } else if (clazz.equals(SyncRequest.class)) {
- return NettyProtocol.COPYCAT_SYNC;
- } else if (clazz.equals(SubmitRequest.class)) {
- return NettyProtocol.COPYCAT_SUBMIT;
- } else if (clazz.equals(PingRequest.class)) {
- return NettyProtocol.COPYCAT_PING;
- } else {
- throw new IllegalArgumentException("Unknown class " + clazz.getName());
- }
-
- }
-
- private <I, O> CompletableFuture<O> requestReply(I request) {
- CompletableFuture<O> future = new CompletableFuture<>();
- THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
- return future;
- }
-
- private class RPCTask<I, O> implements Runnable {
-
- private final String messageType;
- private final byte[] payload;
-
- private final CompletableFuture<O> future;
-
- public RPCTask(I request, CompletableFuture<O> future) {
- this.messageType = messageType(request);
- this.payload = NettyProtocol.SERIALIZER.encode(request);
- this.future = future;
- }
-
- @Override
- public void run() {
- try {
- byte[] response = messagingService
- .sendAndReceive(remoteEp, messageType, payload)
- .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
- future.complete(NettyProtocol.SERIALIZER.decode(response));
-
- } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
- if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
- messageType.equals(NettyProtocol.COPYCAT_PING)) {
- log.warn("Request to {} failed. Will retry "
- + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
- THREAD_POOL.schedule(
- this,
- NettyProtocol.RETRY_INTERVAL_MILLIS,
- TimeUnit.MILLISECONDS);
- } else {
- future.completeExceptionally(e);
- }
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
deleted file mode 100644
index d06999e..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.onlab.onos.store.service.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.protocol.PingRequest;
-import net.kuujo.copycat.protocol.PollRequest;
-import net.kuujo.copycat.protocol.RequestHandler;
-import net.kuujo.copycat.protocol.SubmitRequest;
-import net.kuujo.copycat.protocol.SyncRequest;
-import net.kuujo.copycat.spi.protocol.ProtocolServer;
-
-import org.onlab.netty.Message;
-import org.onlab.netty.MessageHandler;
-import org.onlab.netty.NettyMessagingService;
-import org.slf4j.Logger;
-
-/**
- * {@link NettyMessagingService} based Copycat protocol server.
- */
-public class NettyProtocolServer implements ProtocolServer {
-
- private final Logger log = getLogger(getClass());
-
- private final NettyMessagingService messagingService;
- private RequestHandler handler;
-
-
- public NettyProtocolServer(TcpMember member) {
- messagingService = new NettyMessagingService(member.host(), member.port());
-
- messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
- messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
- messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
- messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
- }
-
- protected NettyMessagingService getNettyMessagingService() {
- return messagingService;
- }
-
- @Override
- public void requestHandler(RequestHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public CompletableFuture<Void> listen() {
- try {
- messagingService.activate();
- return CompletableFuture.completedFuture(null);
- } catch (Exception e) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(e);
- return future;
- }
- }
-
- @Override
- public CompletableFuture<Void> close() {
- CompletableFuture<Void> future = new CompletableFuture<>();
- try {
- messagingService.deactivate();
- future.complete(null);
- return future;
- } catch (Exception e) {
- future.completeExceptionally(e);
- return future;
- }
- }
-
- private class CopycatMessageHandler<T> implements MessageHandler {
-
- @Override
- public void handle(Message message) throws IOException {
- T request = NettyProtocol.SERIALIZER.decode(message.payload());
- if (request.getClass().equals(PingRequest.class)) {
- handler.ping((PingRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(NettyProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to ping request", e);
- }
- });
- } else if (request.getClass().equals(PollRequest.class)) {
- handler.poll((PollRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(NettyProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to poll request", e);
- }
- });
- } else if (request.getClass().equals(SyncRequest.class)) {
- handler.sync((SyncRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(NettyProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to sync request", e);
- }
- });
- } else if (request.getClass().equals(SubmitRequest.class)) {
- handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(NettyProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to submit request", e);
- }
- });
- }
- }
- }
-}