blob: e16838d4d4c917e671af28ffcf558fea1e4e5f7d [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.Collection;
Madan Jampani7e634e62014-11-09 21:24:13 -08009import java.util.Collections;
Madan Jampani9b19a822014-11-04 21:37:13 -080010import java.util.HashMap;
11import java.util.HashSet;
12import java.util.LinkedList;
13import java.util.Vector;
14
15import net.kuujo.copycat.cluster.TcpClusterConfig;
16import net.kuujo.copycat.cluster.TcpMember;
17import net.kuujo.copycat.internal.log.ConfigurationEntry;
18import net.kuujo.copycat.internal.log.CopycatEntry;
19import net.kuujo.copycat.internal.log.OperationEntry;
20import net.kuujo.copycat.internal.log.SnapshotEntry;
21import net.kuujo.copycat.protocol.PingRequest;
22import net.kuujo.copycat.protocol.PingResponse;
23import net.kuujo.copycat.protocol.PollRequest;
24import net.kuujo.copycat.protocol.PollResponse;
25import net.kuujo.copycat.protocol.Response.Status;
26import net.kuujo.copycat.protocol.SubmitRequest;
27import net.kuujo.copycat.protocol.SubmitResponse;
28import net.kuujo.copycat.protocol.SyncRequest;
29import net.kuujo.copycat.protocol.SyncResponse;
30import net.kuujo.copycat.spi.protocol.Protocol;
31import net.kuujo.copycat.spi.protocol.ProtocolClient;
32import net.kuujo.copycat.spi.protocol.ProtocolServer;
33
34import org.apache.felix.scr.annotations.Activate;
35import org.apache.felix.scr.annotations.Component;
36import org.apache.felix.scr.annotations.Deactivate;
37import org.apache.felix.scr.annotations.Reference;
38import org.apache.felix.scr.annotations.ReferenceCardinality;
39import org.apache.felix.scr.annotations.Service;
40import org.onlab.onos.cluster.ClusterService;
41import org.onlab.onos.cluster.ControllerNode;
42import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
43import org.onlab.onos.store.cluster.messaging.MessageSubject;
44import org.onlab.onos.store.serializers.ImmutableListSerializer;
45import org.onlab.onos.store.serializers.ImmutableMapSerializer;
46import org.onlab.onos.store.serializers.ImmutableSetSerializer;
47import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampanibd1f0262014-11-12 01:51:25 -080048import org.onlab.onos.store.service.BatchReadRequest;
49import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani9b19a822014-11-04 21:37:13 -080050import org.onlab.onos.store.service.ReadRequest;
51import org.onlab.onos.store.service.ReadResult;
Madan Jampanibd1f0262014-11-12 01:51:25 -080052import org.onlab.onos.store.service.ReadStatus;
Madan Jampani9b19a822014-11-04 21:37:13 -080053import org.onlab.onos.store.service.VersionedValue;
54import org.onlab.onos.store.service.WriteRequest;
55import org.onlab.onos.store.service.WriteResult;
Madan Jampanibd1f0262014-11-12 01:51:25 -080056import org.onlab.onos.store.service.WriteStatus;
Madan Jampani9b19a822014-11-04 21:37:13 -080057import org.onlab.util.KryoNamespace;
58import org.slf4j.Logger;
59
60import com.esotericsoftware.kryo.Kryo;
61import com.esotericsoftware.kryo.io.Input;
62import com.esotericsoftware.kryo.serializers.CollectionSerializer;
63import com.google.common.collect.ImmutableList;
64import com.google.common.collect.ImmutableMap;
65import com.google.common.collect.ImmutableSet;
66
67/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080068 * ONOS Cluster messaging based Copycat protocol.
Madan Jampani9b19a822014-11-04 21:37:13 -080069 */
Madan Jampani9b19a822014-11-04 21:37:13 -080070@Component(immediate = true)
71@Service
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080072public class ClusterMessagingProtocol
73 implements DatabaseProtocolService, Protocol<TcpMember> {
Madan Jampani9b19a822014-11-04 21:37:13 -080074
75 private final Logger log = getLogger(getClass());
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080078 protected ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080079
80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080081 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080082
83 public static final MessageSubject COPYCAT_PING =
84 new MessageSubject("copycat-raft-consensus-ping");
85 public static final MessageSubject COPYCAT_SYNC =
86 new MessageSubject("copycat-raft-consensus-sync");
87 public static final MessageSubject COPYCAT_POLL =
88 new MessageSubject("copycat-raft-consensus-poll");
89 public static final MessageSubject COPYCAT_SUBMIT =
90 new MessageSubject("copycat-raft-consensus-submit");
91
92 private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
93 .register(PingRequest.class)
94 .register(PingResponse.class)
95 .register(PollRequest.class)
96 .register(PollResponse.class)
97 .register(SyncRequest.class)
98 .register(SyncResponse.class)
99 .register(SubmitRequest.class)
100 .register(SubmitResponse.class)
101 .register(Status.class)
102 .register(ConfigurationEntry.class)
103 .register(SnapshotEntry.class)
104 .register(CopycatEntry.class)
105 .register(OperationEntry.class)
106 .register(TcpClusterConfig.class)
107 .register(TcpMember.class)
108 .build();
109
110 private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
111 .register(ReadRequest.class)
112 .register(WriteRequest.class)
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800113 .register(WriteRequest.Type.class)
Madan Jampani9b19a822014-11-04 21:37:13 -0800114 .register(InternalReadResult.class)
115 .register(InternalWriteResult.class)
116 .register(InternalReadResult.Status.class)
117 .register(WriteResult.class)
118 .register(ReadResult.class)
Madan Jampanibd1f0262014-11-12 01:51:25 -0800119 .register(BatchReadRequest.class)
120 .register(BatchWriteRequest.class)
121 .register(ReadStatus.class)
122 .register(WriteStatus.class)
Madan Jampani9b19a822014-11-04 21:37:13 -0800123 .register(InternalWriteResult.Status.class)
124 .register(VersionedValue.class)
125 .build();
126
127 public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
128 .register(Arrays.asList().getClass(), new CollectionSerializer() {
129 @Override
130 @SuppressWarnings("rawtypes")
131 protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
132 return new ArrayList();
133 }
134 })
135 .register(ImmutableMap.class, new ImmutableMapSerializer())
136 .register(ImmutableList.class, new ImmutableListSerializer())
137 .register(ImmutableSet.class, new ImmutableSetSerializer())
138 .register(
139 Vector.class,
140 ArrayList.class,
141 Arrays.asList().getClass(),
142 HashMap.class,
143 HashSet.class,
144 LinkedList.class,
Madan Jampani7e634e62014-11-09 21:24:13 -0800145 Collections.singletonList("").getClass(),
Madan Jampani9b19a822014-11-04 21:37:13 -0800146 byte[].class)
147 .build();
148
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800149 // serializer used for CopyCat Protocol
Madan Jampani9b19a822014-11-04 21:37:13 -0800150 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
151 @Override
152 protected void setupKryoPool() {
153 serializerPool = KryoNamespace.newBuilder()
154 .register(COPYCAT)
155 .register(COMMON)
156 .register(DATABASE)
157 .build()
158 .populate(1);
159 }
160 };
161
162 @Activate
163 public void activate() {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800164 log.info("Started");
Madan Jampani9b19a822014-11-04 21:37:13 -0800165 }
166
167 @Deactivate
168 public void deactivate() {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800169 log.info("Stopped");
Madan Jampani9b19a822014-11-04 21:37:13 -0800170 }
171
172 @Override
173 public ProtocolServer createServer(TcpMember member) {
174 return new ClusterMessagingProtocolServer(clusterCommunicator);
175 }
176
177 @Override
178 public ProtocolClient createClient(TcpMember member) {
Madan Jampani515865d2014-11-09 22:29:34 -0800179 ControllerNode remoteNode = getControllerNode(member.host(), member.port());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800180 checkNotNull(remoteNode,
181 "A valid controller node is expected for %s:%s",
182 member.host(), member.port());
Madan Jampani9b19a822014-11-04 21:37:13 -0800183 return new ClusterMessagingProtocolClient(
Madan Jampani515865d2014-11-09 22:29:34 -0800184 clusterCommunicator, clusterService.getLocalNode(), remoteNode);
Madan Jampani9b19a822014-11-04 21:37:13 -0800185 }
186
187 private ControllerNode getControllerNode(String host, int port) {
188 for (ControllerNode node : clusterService.getNodes()) {
189 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
190 return node;
191 }
192 }
193 return null;
194 }
Madan Jampani7e634e62014-11-09 21:24:13 -0800195}