Kryo related fixes
- KryoNamespace to allow control over registration id
Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
diff --git a/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java b/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
index b89544e..63d5363 100644
--- a/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
+++ b/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
@@ -15,10 +15,11 @@
*/
package org.onlab.util;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.Pair;
@@ -27,15 +28,17 @@
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.pool.KryoCallback;
import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
-// TODO Add tests for this class.
/**
* Pool of Kryo instances, with classes pre-registered.
*/
//@ThreadSafe
-public final class KryoNamespace implements KryoFactory {
+public final class KryoNamespace implements KryoFactory, KryoPool {
/**
* Default buffer size used for serialization.
@@ -45,17 +48,35 @@
public static final int DEFAULT_BUFFER_SIZE = 4096;
public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
- private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
- private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
+ /**
+ * ID to use if this KryoNamespace does not define registration id.
+ */
+ public static final int FLOATING_ID = -1;
+
+ /**
+ * Smallest ID free to use for user defined registrations.
+ */
+ public static final int INITIAL_ID = 11;
+
+
+ private final KryoPool pool = new KryoPool.Builder(this)
+ .softReferences()
+ .build();
+
+ private final ImmutableList<RegistrationBlock> registeredBlocks;
+
private final boolean registrationRequired;
+
/**
* KryoNamespace builder.
*/
//@NotThreadSafe
public static final class Builder {
- private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+ private int blockHeadId = INITIAL_ID;
+ private List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+ private List<RegistrationBlock> blocks = new ArrayList<>();
private boolean registrationRequired = true;
/**
@@ -64,7 +85,27 @@
* @return KryoNamespace
*/
public KryoNamespace build() {
- return new KryoNamespace(types, registrationRequired);
+ if (!types.isEmpty()) {
+ blocks.add(new RegistrationBlock(this.blockHeadId, types));
+ }
+ return new KryoNamespace(blocks, registrationRequired).populate(1);
+ }
+
+ /**
+ * Sets the next Kryo registration Id for following register entries.
+ *
+ * @param id Kryo registration Id
+ * @return this
+ *
+ * @see Kryo#register(Class, Serializer, int)
+ */
+ public Builder nextId(final int id) {
+ if (!types.isEmpty()) {
+ blocks.add(new RegistrationBlock(this.blockHeadId, types));
+ types = new ArrayList<>();
+ }
+ this.blockHeadId = id;
+ return this;
}
/**
@@ -75,7 +116,7 @@
*/
public Builder register(final Class<?>... expectedTypes) {
for (Class<?> clazz : expectedTypes) {
- types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
+ types.add(Pair.of(clazz, null));
}
return this;
}
@@ -83,26 +124,54 @@
/**
* Registers a class and it's serializer.
*
- * @param clazz the class to register
+ * @param classes list of classes to register
* @param serializer serializer to use for the class
* @return this
*/
- public Builder register(final Class<?> clazz, Serializer<?> serializer) {
- types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
+ public Builder register(Serializer<?> serializer, final Class<?>... classes) {
+ for (Class<?> clazz : classes) {
+ types.add(Pair.of(clazz, serializer));
+ }
+ return this;
+ }
+
+ private Builder register(RegistrationBlock block) {
+ if (block.begin() != FLOATING_ID) {
+ // flush pending types
+ nextId(block.begin());
+ blocks.add(block);
+ nextId(block.begin() + block.types().size());
+ } else {
+ // flush pending types
+ final int addedBlockBegin = blockHeadId + types.size();
+ nextId(addedBlockBegin);
+ blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
+ nextId(addedBlockBegin + block.types().size());
+ }
return this;
}
/**
* Registers all the class registered to given KryoNamespace.
*
- * @param pool KryoNamespace
+ * @param ns KryoNamespace
* @return this
*/
- public Builder register(final KryoNamespace pool) {
- types.addAll(pool.registeredTypes);
+ public Builder register(final KryoNamespace ns) {
+ for (RegistrationBlock block : ns.registeredBlocks) {
+ this.register(block);
+ }
return this;
}
+ /**
+ * Sets the registrationRequired flag.
+ *
+ * @param registrationRequired Kryo's registrationRequired flag
+ * @return this
+ *
+ * @see Kryo#setRegistrationRequired(boolean)
+ */
public Builder setRegistrationRequired(boolean registrationRequired) {
this.registrationRequired = registrationRequired;
return this;
@@ -124,8 +193,8 @@
* @param registeredTypes types to register
* @param registrationRequired
*/
- private KryoNamespace(final List<Pair<Class<?>, Serializer<?>>> registeredTypes, boolean registrationRequired) {
- this.registeredTypes = ImmutableList.copyOf(registeredTypes);
+ private KryoNamespace(final List<RegistrationBlock> registeredTypes, boolean registrationRequired) {
+ this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
this.registrationRequired = registrationRequired;
}
@@ -136,39 +205,14 @@
* @return this
*/
public KryoNamespace populate(int instances) {
- List<Kryo> kryos = new ArrayList<>(instances);
+
for (int i = 0; i < instances; ++i) {
- kryos.add(create());
+ release(create());
}
- pool.addAll(kryos);
return this;
}
/**
- * Gets a Kryo instance from the pool.
- *
- * @return Kryo instance
- */
- public Kryo getKryo() {
- Kryo kryo = pool.poll();
- if (kryo == null) {
- return create();
- }
- return kryo;
- }
-
- /**
- * Returns a Kryo instance to the pool.
- *
- * @param kryo instance obtained from this pool.
- */
- public void putKryo(Kryo kryo) {
- if (kryo != null) {
- pool.add(kryo);
- }
- }
-
- /**
* Serializes given object to byte array using Kryo instance in pool.
* <p>
* Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
@@ -189,13 +233,13 @@
*/
public byte[] serialize(final Object obj, final int bufferSize) {
ByteBufferOutput out = new ByteBufferOutput(bufferSize, MAX_BUFFER_SIZE);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
return out.toBytes();
} finally {
- putKryo(kryo);
+ release(kryo);
}
}
@@ -207,12 +251,40 @@
*/
public void serialize(final Object obj, final ByteBuffer buffer) {
ByteBufferOutput out = new ByteBufferOutput(buffer);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
kryo.writeClassAndObject(out, obj);
out.flush();
} finally {
- putKryo(kryo);
+ release(kryo);
+ }
+ }
+
+ /**
+ * Serializes given object to OutputStream using Kryo instance in pool.
+ *
+ * @param obj Object to serialize
+ * @param stream to write to
+ */
+ public void serialize(final Object obj, final OutputStream stream) {
+ serialize(obj, stream, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Serializes given object to OutputStream using Kryo instance in pool.
+ *
+ * @param obj Object to serialize
+ * @param stream to write to
+ * @param bufferSize size of the buffer in front of the stream
+ */
+ public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
+ ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
+ Kryo kryo = borrow();
+ try {
+ kryo.writeClassAndObject(out, obj);
+ out.flush();
+ } finally {
+ release(kryo);
}
}
@@ -225,13 +297,13 @@
*/
public <T> T deserialize(final byte[] bytes) {
Input in = new Input(bytes);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
- putKryo(kryo);
+ release(kryo);
}
}
@@ -244,18 +316,49 @@
*/
public <T> T deserialize(final ByteBuffer buffer) {
ByteBufferInput in = new ByteBufferInput(buffer);
- Kryo kryo = getKryo();
+ Kryo kryo = borrow();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
- putKryo(kryo);
+ release(kryo);
}
}
/**
- * Creates a Kryo instance with {@link #registeredTypes} pre-registered.
+ * Deserializes given InputStream to an Object using Kryo instance in pool.
+ *
+ * @param stream input stream
+ * @param <T> deserialized Object type
+ * @return deserialized Object
+ */
+ public <T> T deserialize(final InputStream stream) {
+ return deserialize(stream, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Deserializes given InputStream to an Object using Kryo instance in pool.
+ *
+ * @param stream input stream
+ * @param <T> deserialized Object type
+ * @return deserialized Object
+ * @param bufferSize size of the buffer in front of the stream
+ */
+ public <T> T deserialize(final InputStream stream, final int bufferSize) {
+ ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
+ Kryo kryo = borrow();
+ try {
+ @SuppressWarnings("unchecked")
+ T obj = (T) kryo.readClassAndObject(in);
+ return obj;
+ } finally {
+ release(kryo);
+ }
+ }
+
+ /**
+ * Creates a Kryo instance.
*
* @return Kryo instance
*/
@@ -263,42 +366,68 @@
public Kryo create() {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
- for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
- final Serializer<?> serializer = registry.getRight();
- if (serializer == null) {
- kryo.register(registry.getLeft());
- } else {
- kryo.register(registry.getLeft(), serializer);
- if (serializer instanceof FamilySerializer) {
- FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
- fser.registerFamilies(kryo);
+ for (RegistrationBlock block : registeredBlocks) {
+ int id = block.begin();
+ if (id == FLOATING_ID) {
+ id = kryo.getNextRegistrationId();
+ }
+ for (Pair<Class<?>, Serializer<?>> entry : block.types()) {
+ final Serializer<?> serializer = entry.getRight();
+ if (serializer == null) {
+ kryo.register(entry.getLeft(), id++);
+ } else {
+ kryo.register(entry.getLeft(), serializer, id++);
}
}
}
return kryo;
}
- /**
- * Serializer implementation, which required registration of family of Classes.
- * @param <T> base type of this serializer.
- */
- public abstract static class FamilySerializer<T> extends Serializer<T> {
+ @Override
+ public Kryo borrow() {
+ return pool.borrow();
+ }
+ @Override
+ public void release(Kryo kryo) {
+ pool.release(kryo);
+ }
- public FamilySerializer(boolean acceptsNull) {
- super(acceptsNull);
+ @Override
+ public <T> T run(KryoCallback<T> callback) {
+ return pool.run(callback);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("registeredBlocks", registeredBlocks)
+ .toString();
+ }
+
+ static final class RegistrationBlock {
+ private final int begin;
+ private final ImmutableList<Pair<Class<?>, Serializer<?>>> types;
+
+ public RegistrationBlock(int begin, List<Pair<Class<?>, Serializer<?>>> types) {
+ this.begin = begin;
+ this.types = ImmutableList.copyOf(types);
}
- public FamilySerializer(boolean acceptsNull, boolean immutable) {
- super(acceptsNull, immutable);
+ public int begin() {
+ return begin;
}
- /**
- * Registers other classes this Serializer supports.
- *
- * @param kryo instance to register classes to
- */
- public void registerFamilies(Kryo kryo) {
+ public ImmutableList<Pair<Class<?>, Serializer<?>>> types() {
+ return types;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("begin", begin)
+ .add("types", types)
+ .toString();
}
}
}