blob: 58c268cecad5d098e8c3a94ec4ec21bdaedc388e [file] [log] [blame]
Yuta HIGUCHI24a086b2014-09-21 23:28:41 -07001package org.onlab.util;
2
3import java.util.ArrayList;
4import java.util.List;
5import java.util.concurrent.ConcurrentLinkedQueue;
6
7import org.apache.commons.lang3.tuple.Pair;
8
9import com.esotericsoftware.kryo.Kryo;
10import com.esotericsoftware.kryo.Serializer;
11import com.esotericsoftware.kryo.io.Input;
12import com.esotericsoftware.kryo.io.Output;
13import com.google.common.collect.ImmutableList;
14
15// TODO Add tests for this class.
16/**
17 * Pool of Kryo instances, with classes pre-registered.
18 */
19//@ThreadSafe
20public final class KryoPool {
21
22 /**
23 * Default buffer size used for serialization.
24 *
25 * @see #serialize(Object)
26 */
27 public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
28
29 private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
30 private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
31 private final boolean registrationRequired;
32
33 /**
34 * KryoPool builder.
35 */
36 //@NotThreadSafe
37 public static final class Builder {
38
39 private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
40
41 /**
42 * Builds a {@link KryoPool} instance.
43 *
44 * @return KryoPool
45 */
46 public KryoPool build() {
47 return new KryoPool(types);
48 }
49
50 /**
51 * Registers classes to be serialized using Kryo default serializer.
52 *
53 * @param expectedTypes list of classes
54 * @return this
55 */
56 public Builder register(final Class<?>... expectedTypes) {
57 for (Class<?> clazz : expectedTypes) {
58 types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
59 }
60 return this;
61 }
62
63 /**
64 * Registers a class and it's serializer.
65 *
66 * @param clazz the class to register
67 * @param serializer serializer to use for the class
68 * @return this
69 */
70 public Builder register(final Class<?> clazz, Serializer<?> serializer) {
71 types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
72 return this;
73 }
74
75 /**
76 * Registers all the class registered to given KryoPool.
77 *
78 * @param pool KryoPool
79 * @return this
80 */
81 public Builder register(final KryoPool pool) {
82 types.addAll(pool.registeredTypes);
83 return this;
84 }
85 }
86
87 /**
88 * Creates a new {@link KryoPool} builder.
89 *
90 * @return builder
91 */
92 public static Builder newBuilder() {
93 return new Builder();
94 }
95
96 /**
97 * Creates a Kryo instance pool.
98 *
99 * @param registerdTypes types to register
100 */
101 private KryoPool(final List<Pair<Class<?>, Serializer<?>>> registerdTypes) {
102 this.registeredTypes = ImmutableList.copyOf(registerdTypes);
103 // always true for now
104 this.registrationRequired = true;
105 }
106
107 /**
108 * Populates the Kryo pool.
109 *
110 * @param instances to add to the pool
111 * @return this
112 */
113 public KryoPool populate(int instances) {
114 List<Kryo> kryos = new ArrayList<>(instances);
115 for (int i = 0; i < instances; ++i) {
116 kryos.add(newKryoInstance());
117 }
118 pool.addAll(kryos);
119 return this;
120 }
121
122 /**
123 * Gets a Kryo instance from the pool.
124 *
125 * @return Kryo instance
126 */
127 public Kryo getKryo() {
128 Kryo kryo = pool.poll();
129 if (kryo == null) {
130 return newKryoInstance();
131 }
132 return kryo;
133 }
134
135 /**
136 * Returns a Kryo instance to the pool.
137 *
138 * @param kryo instance obtained from this pool.
139 */
140 public void putKryo(Kryo kryo) {
141 if (kryo != null) {
142 pool.add(kryo);
143 }
144 }
145
146 /**
147 * Serializes given object to byte array using Kryo instance in pool.
148 * <p>
149 * Note: Serialized bytes must be smaller than {@link #DEFAULT_BUFFER_SIZE}.
150 *
151 * @param obj Object to serialize
152 * @return serialized bytes
153 */
154 public byte[] serialize(final Object obj) {
155 return serialize(obj, DEFAULT_BUFFER_SIZE);
156 }
157
158 /**
159 * Serializes given object to byte array using Kryo instance in pool.
160 *
161 * @param obj Object to serialize
162 * @param bufferSize maximum size of serialized bytes
163 * @return serialized bytes
164 */
165 public byte[] serialize(final Object obj, final int bufferSize) {
166 Output out = new Output(bufferSize);
167 Kryo kryo = getKryo();
168 try {
169 kryo.writeClassAndObject(out, obj);
170 return out.toBytes();
171 } finally {
172 putKryo(kryo);
173 }
174 }
175
176 /**
177 * Deserializes given byte array to Object using Kryo instance in pool.
178 *
179 * @param bytes serialized bytes
180 * @param <T> deserialized Object type
181 * @return deserialized Object
182 */
183 public <T> T deserialize(final byte[] bytes) {
184 Input in = new Input(bytes);
185 Kryo kryo = getKryo();
186 try {
187 @SuppressWarnings("unchecked")
188 T obj = (T) kryo.readClassAndObject(in);
189 return obj;
190 } finally {
191 putKryo(kryo);
192 }
193 }
194
195
196 /**
197 * Creates a Kryo instance with {@link #registeredTypes} pre-registered.
198 *
199 * @return Kryo instance
200 */
201 private Kryo newKryoInstance() {
202 Kryo kryo = new Kryo();
203 kryo.setRegistrationRequired(registrationRequired);
204 for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
205 if (registry.getRight() == null) {
206 kryo.register(registry.getLeft());
207 } else {
208 kryo.register(registry.getLeft(), registry.getRight());
209 }
210 }
211 return kryo;
212 }
213}