blob: 9a2259abb86ad1b101d32d7f8db0c913029eee11 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.HashSet;
8import java.util.LinkedList;
9import java.util.Vector;
10
11import net.kuujo.copycat.cluster.TcpClusterConfig;
12import net.kuujo.copycat.cluster.TcpMember;
13import net.kuujo.copycat.internal.log.ConfigurationEntry;
14import net.kuujo.copycat.internal.log.CopycatEntry;
15import net.kuujo.copycat.internal.log.OperationEntry;
16import net.kuujo.copycat.internal.log.SnapshotEntry;
17import net.kuujo.copycat.protocol.PingRequest;
18import net.kuujo.copycat.protocol.PingResponse;
19import net.kuujo.copycat.protocol.PollRequest;
20import net.kuujo.copycat.protocol.PollResponse;
21import net.kuujo.copycat.protocol.Response.Status;
22import net.kuujo.copycat.protocol.SubmitRequest;
23import net.kuujo.copycat.protocol.SubmitResponse;
24import net.kuujo.copycat.protocol.SyncRequest;
25import net.kuujo.copycat.protocol.SyncResponse;
26import net.kuujo.copycat.spi.protocol.Protocol;
27import net.kuujo.copycat.spi.protocol.ProtocolClient;
28import net.kuujo.copycat.spi.protocol.ProtocolServer;
29
30import org.onlab.onos.store.serializers.ImmutableListSerializer;
31import org.onlab.onos.store.serializers.ImmutableMapSerializer;
32import org.onlab.onos.store.serializers.ImmutableSetSerializer;
33import org.onlab.onos.store.serializers.KryoSerializer;
34import org.onlab.onos.store.service.ReadRequest;
35import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080036import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080037import org.onlab.onos.store.service.WriteRequest;
38import org.onlab.onos.store.service.WriteResult;
39import org.onlab.util.KryoNamespace;
40
41import com.esotericsoftware.kryo.Kryo;
42import com.esotericsoftware.kryo.io.Input;
43import com.esotericsoftware.kryo.serializers.CollectionSerializer;
44import com.google.common.collect.ImmutableList;
45import com.google.common.collect.ImmutableMap;
46import com.google.common.collect.ImmutableSet;
47
48/**
49 * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
50 */
51public class NettyProtocol implements Protocol<TcpMember> {
52
53 public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
54 public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
55 public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
56 public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
57
58 // TODO: make this configurable.
59 public static final long RETRY_INTERVAL_MILLIS = 2000;
60
61 private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
62 .register(PingRequest.class)
63 .register(PingResponse.class)
64 .register(PollRequest.class)
65 .register(PollResponse.class)
66 .register(SyncRequest.class)
67 .register(SyncResponse.class)
68 .register(SubmitRequest.class)
69 .register(SubmitResponse.class)
70 .register(Status.class)
71 .register(ConfigurationEntry.class)
72 .register(SnapshotEntry.class)
73 .register(CopycatEntry.class)
74 .register(OperationEntry.class)
75 .register(TcpClusterConfig.class)
76 .register(TcpMember.class)
77 .build();
78
79 // TODO: Move to the right place.
80 private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
81 .register(ReadRequest.class)
82 .register(WriteRequest.class)
83 .register(InternalReadResult.class)
84 .register(InternalWriteResult.class)
85 .register(InternalReadResult.Status.class)
86 .register(WriteResult.class)
87 .register(ReadResult.class)
88 .register(InternalWriteResult.Status.class)
89 .register(VersionedValue.class)
90 .build();
91
92 public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
93 .register(Arrays.asList().getClass(), new CollectionSerializer() {
94 @Override
95 @SuppressWarnings("rawtypes")
96 protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
97 return new ArrayList();
98 }
99 })
100 .register(ImmutableMap.class, new ImmutableMapSerializer())
101 .register(ImmutableList.class, new ImmutableListSerializer())
102 .register(ImmutableSet.class, new ImmutableSetSerializer())
103 .register(
104 Vector.class,
105 ArrayList.class,
106 Arrays.asList().getClass(),
107 HashMap.class,
108 HashSet.class,
109 LinkedList.class,
110 byte[].class)
111 .build();
112
113 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
114 @Override
115 protected void setupKryoPool() {
116 serializerPool = KryoNamespace.newBuilder()
117 .register(COPYCAT)
118 .register(COMMON)
119 .register(CRAFT)
120 .build()
121 .populate(1);
122 }
123 };
124
125 private NettyProtocolServer server = null;
126
127 // FIXME: This is a total hack.Assumes
128 // ProtocolServer is initialized before ProtocolClient
129 protected NettyProtocolServer getServer() {
130 if (server == null) {
131 throw new IllegalStateException("ProtocolServer is not initialized yet!");
132 }
133 return server;
134 }
135
136 @Override
137 public ProtocolServer createServer(TcpMember member) {
138 server = new NettyProtocolServer(member);
139 return server;
140 }
141
142 @Override
143 public ProtocolClient createClient(TcpMember member) {
144 return new NettyProtocolClient(this, member);
145 }
146}