Copycat messaging new happens over the same cluster messaging used for all other ONOS p2p communication
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
new file mode 100644
index 0000000..2e7fe11
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -0,0 +1,196 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Vector;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.internal.log.ConfigurationEntry;
+import net.kuujo.copycat.internal.log.CopycatEntry;
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.internal.log.SnapshotEntry;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.Protocol;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ImmutableListSerializer;
+import org.onlab.onos.store.serializers.ImmutableMapSerializer;
+import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+import org.slf4j.Logger;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under
+ * the License.
+ */
+
+@Component(immediate = true)
+@Service
+public class ClusterMessagingProtocol implements Protocol<TcpMember> {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterCommunicationService clusterCommunicator;
+
+ public static final MessageSubject COPYCAT_PING =
+ new MessageSubject("copycat-raft-consensus-ping");
+ public static final MessageSubject COPYCAT_SYNC =
+ new MessageSubject("copycat-raft-consensus-sync");
+ public static final MessageSubject COPYCAT_POLL =
+ new MessageSubject("copycat-raft-consensus-poll");
+ public static final MessageSubject COPYCAT_SUBMIT =
+ new MessageSubject("copycat-raft-consensus-submit");
+
+ private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .register(PingRequest.class)
+ .register(PingResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(SyncRequest.class)
+ .register(SyncResponse.class)
+ .register(SubmitRequest.class)
+ .register(SubmitResponse.class)
+ .register(Status.class)
+ .register(ConfigurationEntry.class)
+ .register(SnapshotEntry.class)
+ .register(CopycatEntry.class)
+ .register(OperationEntry.class)
+ .register(TcpClusterConfig.class)
+ .register(TcpMember.class)
+ .build();
+
+ private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
+ .register(ReadRequest.class)
+ .register(WriteRequest.class)
+ .register(InternalReadResult.class)
+ .register(InternalWriteResult.class)
+ .register(InternalReadResult.Status.class)
+ .register(WriteResult.class)
+ .register(ReadResult.class)
+ .register(InternalWriteResult.Status.class)
+ .register(VersionedValue.class)
+ .build();
+
+ public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+ .register(Arrays.asList().getClass(), new CollectionSerializer() {
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
+ return new ArrayList();
+ }
+ })
+ .register(ImmutableMap.class, new ImmutableMapSerializer())
+ .register(ImmutableList.class, new ImmutableListSerializer())
+ .register(ImmutableSet.class, new ImmutableSetSerializer())
+ .register(
+ Vector.class,
+ ArrayList.class,
+ Arrays.asList().getClass(),
+ HashMap.class,
+ HashSet.class,
+ LinkedList.class,
+ byte[].class)
+ .build();
+
+ public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(COPYCAT)
+ .register(COMMON)
+ .register(DATABASE)
+ .build()
+ .populate(1);
+ }
+ };
+
+ @Activate
+ public void activate() {
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped.");
+ }
+
+ @Override
+ public ProtocolServer createServer(TcpMember member) {
+ return new ClusterMessagingProtocolServer(clusterCommunicator);
+ }
+
+ @Override
+ public ProtocolClient createClient(TcpMember member) {
+ ControllerNode node = getControllerNode(member.host(), member.port());
+ checkNotNull(node, "A valid controller node is expected");
+ return new ClusterMessagingProtocolClient(
+ clusterCommunicator, node);
+ }
+
+ private ControllerNode getControllerNode(String host, int port) {
+ for (ControllerNode node : clusterService.getNodes()) {
+ if (node.ip().toString().equals(host) && node.tcpPort() == port) {
+ return node;
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file