blob: 9855ec6e91393cb6cb888f25620393041b756aaa [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import java.util.ArrayList;
4import java.util.Arrays;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.HashSet;
8import java.util.LinkedList;
9import java.util.Vector;
10
11import net.kuujo.copycat.cluster.TcpClusterConfig;
12import net.kuujo.copycat.cluster.TcpMember;
13import net.kuujo.copycat.internal.log.ConfigurationEntry;
14import net.kuujo.copycat.internal.log.CopycatEntry;
15import net.kuujo.copycat.internal.log.OperationEntry;
16import net.kuujo.copycat.internal.log.SnapshotEntry;
17import net.kuujo.copycat.protocol.PingRequest;
18import net.kuujo.copycat.protocol.PingResponse;
19import net.kuujo.copycat.protocol.PollRequest;
20import net.kuujo.copycat.protocol.PollResponse;
21import net.kuujo.copycat.protocol.Response.Status;
22import net.kuujo.copycat.protocol.SubmitRequest;
23import net.kuujo.copycat.protocol.SubmitResponse;
24import net.kuujo.copycat.protocol.SyncRequest;
25import net.kuujo.copycat.protocol.SyncResponse;
26import net.kuujo.copycat.spi.protocol.Protocol;
27import net.kuujo.copycat.spi.protocol.ProtocolClient;
28import net.kuujo.copycat.spi.protocol.ProtocolServer;
29
30import org.onlab.onos.store.serializers.ImmutableListSerializer;
31import org.onlab.onos.store.serializers.ImmutableMapSerializer;
32import org.onlab.onos.store.serializers.ImmutableSetSerializer;
33import org.onlab.onos.store.serializers.KryoSerializer;
34import org.onlab.onos.store.service.ReadRequest;
35import org.onlab.onos.store.service.ReadResult;
36import org.onlab.onos.store.service.WriteRequest;
37import org.onlab.onos.store.service.WriteResult;
38import org.onlab.util.KryoNamespace;
39
40import com.esotericsoftware.kryo.Kryo;
41import com.esotericsoftware.kryo.io.Input;
42import com.esotericsoftware.kryo.serializers.CollectionSerializer;
43import com.google.common.collect.ImmutableList;
44import com.google.common.collect.ImmutableMap;
45import com.google.common.collect.ImmutableSet;
46
47/**
48 * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
49 */
50public class NettyProtocol implements Protocol<TcpMember> {
51
52 public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
53 public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
54 public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
55 public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
56
57 // TODO: make this configurable.
58 public static final long RETRY_INTERVAL_MILLIS = 2000;
59
60 private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
61 .register(PingRequest.class)
62 .register(PingResponse.class)
63 .register(PollRequest.class)
64 .register(PollResponse.class)
65 .register(SyncRequest.class)
66 .register(SyncResponse.class)
67 .register(SubmitRequest.class)
68 .register(SubmitResponse.class)
69 .register(Status.class)
70 .register(ConfigurationEntry.class)
71 .register(SnapshotEntry.class)
72 .register(CopycatEntry.class)
73 .register(OperationEntry.class)
74 .register(TcpClusterConfig.class)
75 .register(TcpMember.class)
76 .build();
77
78 // TODO: Move to the right place.
79 private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
80 .register(ReadRequest.class)
81 .register(WriteRequest.class)
82 .register(InternalReadResult.class)
83 .register(InternalWriteResult.class)
84 .register(InternalReadResult.Status.class)
85 .register(WriteResult.class)
86 .register(ReadResult.class)
87 .register(InternalWriteResult.Status.class)
88 .register(VersionedValue.class)
89 .build();
90
91 public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
92 .register(Arrays.asList().getClass(), new CollectionSerializer() {
93 @Override
94 @SuppressWarnings("rawtypes")
95 protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
96 return new ArrayList();
97 }
98 })
99 .register(ImmutableMap.class, new ImmutableMapSerializer())
100 .register(ImmutableList.class, new ImmutableListSerializer())
101 .register(ImmutableSet.class, new ImmutableSetSerializer())
102 .register(
103 Vector.class,
104 ArrayList.class,
105 Arrays.asList().getClass(),
106 HashMap.class,
107 HashSet.class,
108 LinkedList.class,
109 byte[].class)
110 .build();
111
112 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
113 @Override
114 protected void setupKryoPool() {
115 serializerPool = KryoNamespace.newBuilder()
116 .register(COPYCAT)
117 .register(COMMON)
118 .register(CRAFT)
119 .build()
120 .populate(1);
121 }
122 };
123
124 private NettyProtocolServer server = null;
125
126 // FIXME: This is a total hack.Assumes
127 // ProtocolServer is initialized before ProtocolClient
128 protected NettyProtocolServer getServer() {
129 if (server == null) {
130 throw new IllegalStateException("ProtocolServer is not initialized yet!");
131 }
132 return server;
133 }
134
135 @Override
136 public ProtocolServer createServer(TcpMember member) {
137 server = new NettyProtocolServer(member);
138 return server;
139 }
140
141 @Override
142 public ProtocolClient createClient(TcpMember member) {
143 return new NettyProtocolClient(this, member);
144 }
145}