blob: 3fae0c5e60652212b4e55b9a5199003fd66b92a4 [file] [log] [blame]
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -07001package org.onlab.util;
2
Yuta HIGUCHIf4b107e2014-09-29 17:27:26 -07003import java.nio.ByteBuffer;
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -07004import java.util.ArrayList;
5import java.util.List;
6import java.util.concurrent.ConcurrentLinkedQueue;
7
8import org.apache.commons.lang3.tuple.Pair;
9
10import com.esotericsoftware.kryo.Kryo;
11import com.esotericsoftware.kryo.Serializer;
Yuta HIGUCHIf4b107e2014-09-29 17:27:26 -070012import com.esotericsoftware.kryo.io.ByteBufferInput;
13import com.esotericsoftware.kryo.io.ByteBufferOutput;
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -070014import com.esotericsoftware.kryo.io.Input;
15import com.esotericsoftware.kryo.io.Output;
16import com.google.common.collect.ImmutableList;
17
18// TODO Add tests for this class.
19/**
20 * Pool of Kryo instances, with classes pre-registered.
21 */
22//@ThreadSafe
23public final class KryoPool {
24
25 /**
26 * Default buffer size used for serialization.
27 *
28 * @see #serialize(Object)
29 */
30 public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
31
32 private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
33 private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
34 private final boolean registrationRequired;
35
36 /**
37 * KryoPool builder.
38 */
39 //@NotThreadSafe
40 public static final class Builder {
41
42 private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
43
44 /**
45 * Builds a {@link KryoPool} instance.
46 *
47 * @return KryoPool
48 */
49 public KryoPool build() {
50 return new KryoPool(types);
51 }
52
53 /**
54 * Registers classes to be serialized using Kryo default serializer.
55 *
56 * @param expectedTypes list of classes
57 * @return this
58 */
59 public Builder register(final Class<?>... expectedTypes) {
60 for (Class<?> clazz : expectedTypes) {
61 types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
62 }
63 return this;
64 }
65
66 /**
67 * Registers a class and it's serializer.
68 *
69 * @param clazz the class to register
70 * @param serializer serializer to use for the class
71 * @return this
72 */
73 public Builder register(final Class<?> clazz, Serializer<?> serializer) {
74 types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
75 return this;
76 }
77
78 /**
79 * Registers all the class registered to given KryoPool.
80 *
81 * @param pool KryoPool
82 * @return this
83 */
84 public Builder register(final KryoPool pool) {
85 types.addAll(pool.registeredTypes);
86 return this;
87 }
88 }
89
90 /**
91 * Creates a new {@link KryoPool} builder.
92 *
93 * @return builder
94 */
95 public static Builder newBuilder() {
96 return new Builder();
97 }
98
99 /**
100 * Creates a Kryo instance pool.
101 *
102 * @param registerdTypes types to register
103 */
104 private KryoPool(final List<Pair<Class<?>, Serializer<?>>> registerdTypes) {
105 this.registeredTypes = ImmutableList.copyOf(registerdTypes);
106 // always true for now
107 this.registrationRequired = true;
108 }
109
110 /**
111 * Populates the Kryo pool.
112 *
113 * @param instances to add to the pool
114 * @return this
115 */
116 public KryoPool populate(int instances) {
117 List<Kryo> kryos = new ArrayList<>(instances);
118 for (int i = 0; i < instances; ++i) {
119 kryos.add(newKryoInstance());
120 }
121 pool.addAll(kryos);
122 return this;
123 }
124
125 /**
126 * Gets a Kryo instance from the pool.
127 *
128 * @return Kryo instance
129 */
130 public Kryo getKryo() {
131 Kryo kryo = pool.poll();
132 if (kryo == null) {
133 return newKryoInstance();
134 }
135 return kryo;
136 }
137
138 /**
139 * Returns a Kryo instance to the pool.
140 *
141 * @param kryo instance obtained from this pool.
142 */
143 public void putKryo(Kryo kryo) {
144 if (kryo != null) {
145 pool.add(kryo);
146 }
147 }
148
149 /**
150 * Serializes given object to byte array using Kryo instance in pool.
151 * <p>
152 * Note: Serialized bytes must be smaller than {@link #DEFAULT_BUFFER_SIZE}.
153 *
154 * @param obj Object to serialize
155 * @return serialized bytes
156 */
157 public byte[] serialize(final Object obj) {
158 return serialize(obj, DEFAULT_BUFFER_SIZE);
159 }
160
161 /**
162 * Serializes given object to byte array using Kryo instance in pool.
163 *
164 * @param obj Object to serialize
165 * @param bufferSize maximum size of serialized bytes
166 * @return serialized bytes
167 */
168 public byte[] serialize(final Object obj, final int bufferSize) {
169 Output out = new Output(bufferSize);
170 Kryo kryo = getKryo();
171 try {
172 kryo.writeClassAndObject(out, obj);
173 return out.toBytes();
174 } finally {
175 putKryo(kryo);
176 }
177 }
178
179 /**
Yuta HIGUCHIf4b107e2014-09-29 17:27:26 -0700180 * Serializes given object to byte buffer using Kryo instance in pool.
181 *
182 * @param obj Object to serialize
183 * @param buffer to write to
184 */
185 public void serialize(final Object obj, final ByteBuffer buffer) {
186 ByteBufferOutput out = new ByteBufferOutput(buffer);
187 Kryo kryo = getKryo();
188 try {
189 kryo.writeClassAndObject(out, obj);
190 } finally {
191 putKryo(kryo);
192 }
193 }
194
195 /**
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -0700196 * Deserializes given byte array to Object using Kryo instance in pool.
197 *
198 * @param bytes serialized bytes
199 * @param <T> deserialized Object type
200 * @return deserialized Object
201 */
202 public <T> T deserialize(final byte[] bytes) {
203 Input in = new Input(bytes);
204 Kryo kryo = getKryo();
205 try {
206 @SuppressWarnings("unchecked")
207 T obj = (T) kryo.readClassAndObject(in);
208 return obj;
209 } finally {
210 putKryo(kryo);
211 }
212 }
213
Yuta HIGUCHIf4b107e2014-09-29 17:27:26 -0700214 /**
215 * Deserializes given byte buffer to Object using Kryo instance in pool.
216 *
217 * @param buffer input with serialized bytes
218 * @param <T> deserialized Object type
219 * @return deserialized Object
220 */
221 public <T> T deserialize(final ByteBuffer buffer) {
222 ByteBufferInput in = new ByteBufferInput(buffer);
223 Kryo kryo = getKryo();
224 try {
225 @SuppressWarnings("unchecked")
226 T obj = (T) kryo.readClassAndObject(in);
227 return obj;
228 } finally {
229 putKryo(kryo);
230 }
231 }
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -0700232
233 /**
234 * Creates a Kryo instance with {@link #registeredTypes} pre-registered.
235 *
236 * @return Kryo instance
237 */
238 private Kryo newKryoInstance() {
239 Kryo kryo = new Kryo();
240 kryo.setRegistrationRequired(registrationRequired);
241 for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
Yuta HIGUCHI533ec322014-09-30 13:29:52 -0700242 final Serializer<?> serializer = registry.getRight();
243 if (serializer == null) {
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -0700244 kryo.register(registry.getLeft());
245 } else {
Yuta HIGUCHI533ec322014-09-30 13:29:52 -0700246 kryo.register(registry.getLeft(), serializer);
247 if (serializer instanceof FamilySerializer) {
248 FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
249 fser.registerFamilies(kryo);
250 }
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -0700251 }
252 }
253 return kryo;
254 }
Yuta HIGUCHI533ec322014-09-30 13:29:52 -0700255
256 /**
257 * Serializer implementation, which required registration of family of Classes.
258 * @param <T> base type of this serializer.
259 */
260 public abstract static class FamilySerializer<T> extends Serializer<T> {
261
262
263 public FamilySerializer(boolean acceptsNull) {
264 super(acceptsNull);
265 }
266
267 public FamilySerializer(boolean acceptsNull, boolean immutable) {
268 super(acceptsNull, immutable);
269 }
270
271 /**
272 * Registers other classes this Serializer supports.
273 *
274 * @param kryo instance to register classes to
275 */
276 public void registerFamilies(Kryo kryo) {
277 }
278 }
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -0700279}