blob: c561221cc28dea375c06534e77c6754e91409837 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static com.google.common.base.Preconditions.checkNotNull;
4import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.Collection;
9import java.util.HashMap;
10import java.util.HashSet;
11import java.util.LinkedList;
12import java.util.Vector;
13
14import net.kuujo.copycat.cluster.TcpClusterConfig;
15import net.kuujo.copycat.cluster.TcpMember;
16import net.kuujo.copycat.internal.log.ConfigurationEntry;
17import net.kuujo.copycat.internal.log.CopycatEntry;
18import net.kuujo.copycat.internal.log.OperationEntry;
19import net.kuujo.copycat.internal.log.SnapshotEntry;
20import net.kuujo.copycat.protocol.PingRequest;
21import net.kuujo.copycat.protocol.PingResponse;
22import net.kuujo.copycat.protocol.PollRequest;
23import net.kuujo.copycat.protocol.PollResponse;
24import net.kuujo.copycat.protocol.Response.Status;
25import net.kuujo.copycat.protocol.SubmitRequest;
26import net.kuujo.copycat.protocol.SubmitResponse;
27import net.kuujo.copycat.protocol.SyncRequest;
28import net.kuujo.copycat.protocol.SyncResponse;
29import net.kuujo.copycat.spi.protocol.Protocol;
30import net.kuujo.copycat.spi.protocol.ProtocolClient;
31import net.kuujo.copycat.spi.protocol.ProtocolServer;
32
33import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
36import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
38import org.apache.felix.scr.annotations.Service;
39import org.onlab.onos.cluster.ClusterService;
40import org.onlab.onos.cluster.ControllerNode;
41import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
42import org.onlab.onos.store.cluster.messaging.MessageSubject;
43import org.onlab.onos.store.serializers.ImmutableListSerializer;
44import org.onlab.onos.store.serializers.ImmutableMapSerializer;
45import org.onlab.onos.store.serializers.ImmutableSetSerializer;
46import org.onlab.onos.store.serializers.KryoSerializer;
47import org.onlab.onos.store.service.ReadRequest;
48import org.onlab.onos.store.service.ReadResult;
49import org.onlab.onos.store.service.VersionedValue;
50import org.onlab.onos.store.service.WriteRequest;
51import org.onlab.onos.store.service.WriteResult;
52import org.onlab.util.KryoNamespace;
53import org.slf4j.Logger;
54
55import com.esotericsoftware.kryo.Kryo;
56import com.esotericsoftware.kryo.io.Input;
57import com.esotericsoftware.kryo.serializers.CollectionSerializer;
58import com.google.common.collect.ImmutableList;
59import com.google.common.collect.ImmutableMap;
60import com.google.common.collect.ImmutableSet;
61
62/**
Madan Jampanidfbfa182014-11-04 22:06:41 -080063 * ONOS Cluster messaging based Copycat protocol.
Madan Jampani9b19a822014-11-04 21:37:13 -080064 */
Madan Jampani9b19a822014-11-04 21:37:13 -080065@Component(immediate = true)
66@Service
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080067public class ClusterMessagingProtocol
68 implements DatabaseProtocolService, Protocol<TcpMember> {
Madan Jampani9b19a822014-11-04 21:37:13 -080069
70 private final Logger log = getLogger(getClass());
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080073 protected ClusterService clusterService;
Madan Jampani9b19a822014-11-04 21:37:13 -080074
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080076 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani9b19a822014-11-04 21:37:13 -080077
78 public static final MessageSubject COPYCAT_PING =
79 new MessageSubject("copycat-raft-consensus-ping");
80 public static final MessageSubject COPYCAT_SYNC =
81 new MessageSubject("copycat-raft-consensus-sync");
82 public static final MessageSubject COPYCAT_POLL =
83 new MessageSubject("copycat-raft-consensus-poll");
84 public static final MessageSubject COPYCAT_SUBMIT =
85 new MessageSubject("copycat-raft-consensus-submit");
86
87 private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
88 .register(PingRequest.class)
89 .register(PingResponse.class)
90 .register(PollRequest.class)
91 .register(PollResponse.class)
92 .register(SyncRequest.class)
93 .register(SyncResponse.class)
94 .register(SubmitRequest.class)
95 .register(SubmitResponse.class)
96 .register(Status.class)
97 .register(ConfigurationEntry.class)
98 .register(SnapshotEntry.class)
99 .register(CopycatEntry.class)
100 .register(OperationEntry.class)
101 .register(TcpClusterConfig.class)
102 .register(TcpMember.class)
103 .build();
104
105 private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
106 .register(ReadRequest.class)
107 .register(WriteRequest.class)
108 .register(InternalReadResult.class)
109 .register(InternalWriteResult.class)
110 .register(InternalReadResult.Status.class)
111 .register(WriteResult.class)
112 .register(ReadResult.class)
113 .register(InternalWriteResult.Status.class)
114 .register(VersionedValue.class)
115 .build();
116
117 public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
118 .register(Arrays.asList().getClass(), new CollectionSerializer() {
119 @Override
120 @SuppressWarnings("rawtypes")
121 protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
122 return new ArrayList();
123 }
124 })
125 .register(ImmutableMap.class, new ImmutableMapSerializer())
126 .register(ImmutableList.class, new ImmutableListSerializer())
127 .register(ImmutableSet.class, new ImmutableSetSerializer())
128 .register(
129 Vector.class,
130 ArrayList.class,
131 Arrays.asList().getClass(),
132 HashMap.class,
133 HashSet.class,
134 LinkedList.class,
135 byte[].class)
136 .build();
137
138 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
139 @Override
140 protected void setupKryoPool() {
141 serializerPool = KryoNamespace.newBuilder()
142 .register(COPYCAT)
143 .register(COMMON)
144 .register(DATABASE)
145 .build()
146 .populate(1);
147 }
148 };
149
150 @Activate
151 public void activate() {
152 log.info("Started.");
153 }
154
155 @Deactivate
156 public void deactivate() {
157 log.info("Stopped.");
158 }
159
160 @Override
161 public ProtocolServer createServer(TcpMember member) {
162 return new ClusterMessagingProtocolServer(clusterCommunicator);
163 }
164
165 @Override
166 public ProtocolClient createClient(TcpMember member) {
167 ControllerNode node = getControllerNode(member.host(), member.port());
168 checkNotNull(node, "A valid controller node is expected");
169 return new ClusterMessagingProtocolClient(
170 clusterCommunicator, node);
171 }
172
173 private ControllerNode getControllerNode(String host, int port) {
174 for (ControllerNode node : clusterService.getNodes()) {
175 if (node.ip().toString().equals(host) && node.tcpPort() == port) {
176 return node;
177 }
178 }
179 return null;
180 }
181}