Make KryoFactory thread safe.
- Added static utility method to serialize/deserialze Object.
Change-Id: I3cad1cb957a16fab4aa087e958262f4d0a5c4b67
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java b/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
index 0e280ee..3322ce3 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
@@ -49,8 +49,7 @@
table = DataStoreClient.getClient().getTable(INTENT_JOURNAL);
stream = new ByteArrayOutputStream(1024);
output = new Output(stream);
- // FIXME Using KryoFactory only to register classes and not using the pool.
- kryo = new KryoFactory(1).newKryo();
+ kryo = KryoFactory.newKryoObject();
}
private long getNextBlock() {
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index 69c0b1a..4341e4a 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -7,7 +7,11 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.concurrent.ThreadSafe;
import net.floodlightcontroller.core.IFloodlightProviderService.Role;
import net.floodlightcontroller.util.MACAddress;
@@ -64,14 +68,27 @@
import net.onrc.onos.core.util.SwitchPort;
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
/**
* Class factory for allocating Kryo instances for
* serialization/deserialization of classes.
*/
+@ThreadSafe
public class KryoFactory {
+
private static final int DEFAULT_PREALLOCATIONS = 100;
- private ArrayList<Kryo> kryoList = new ArrayList<Kryo>();
+ /**
+ * Default buffer size used for serialization.
+ *
+ * @see #serialize(Object)
+ */
+ public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
+
+ private static final KryoFactory GLOBAL_POOL = new KryoFactory();
+
+ private ConcurrentLinkedQueue<Kryo> kryoList = new ConcurrentLinkedQueue<Kryo>();
/**
* Default constructor.
@@ -89,7 +106,6 @@
*/
public KryoFactory(final int initialCapacity) {
// Preallocate
- kryoList.ensureCapacity(initialCapacity);
for (int i = 0; i < initialCapacity; i++) {
Kryo kryo = newKryoObject();
kryoList.add(kryo);
@@ -102,7 +118,15 @@
* @return the Kryo object.
*/
public Kryo newKryo() {
- return newDeleteKryo(null);
+ Kryo kryo = kryoList.poll();
+ if (kryo == null) {
+ // Will defer additional allocation until deleteKryo().
+ // It is more likely that it is no longer latency sensitive
+ // by the time caller is recycling Kryo instance.
+ return newKryoObject();
+ } else {
+ return kryo;
+ }
}
/**
@@ -110,35 +134,18 @@
*
* @param deleteKryo the object to delete.
*/
- public void deleteKryo(Kryo deleteKryo) {
- newDeleteKryo(deleteKryo);
- }
-
- /**
- * Creates or deletes a Kryo object.
- *
- * @param deleteKryo if null, then allocate and return a new object,
- * otherwise delete the provided object.
- * @return a new Kryo object if needed, otherwise null.
- */
- private synchronized Kryo newDeleteKryo(Kryo deleteKryo) {
- if (deleteKryo != null) {
- // Delete an entry by moving it back to the buffer
- kryoList.add(deleteKryo);
- return null;
- } else {
- Kryo kryo = null;
- if (kryoList.isEmpty()) {
- // Preallocate
- for (int i = 0; i < 100; i++) {
- kryo = newKryoObject();
- kryoList.add(kryo);
- }
+ public void deleteKryo(final Kryo deleteKryo) {
+ if (kryoList.isEmpty()) {
+ // buffer extra if kryo instance pool has exhausted.
+ List<Kryo> kryos = new ArrayList<>(DEFAULT_PREALLOCATIONS);
+ for (int i = 0; i < DEFAULT_PREALLOCATIONS; ++i) {
+ kryos.add(newKryoObject());
}
-
- kryo = kryoList.remove(kryoList.size() - 1);
- return kryo;
+ kryoList.addAll(kryos);
}
+
+ // recycle
+ kryoList.add(deleteKryo);
}
/**
@@ -251,4 +258,53 @@
return kryo;
}
+
+ /**
+ * Serializes given object to byte array using Kryo instance in global pool.
+ * <p>
+ * Note: Serialized bytes must be smaller than DEFAULT_BUFFER_SIZE (=1MB).
+ *
+ * @param obj Object to serialize (Class must be registered to KryoFactory)
+ * @return serialized bytes
+ */
+ public static byte[] serialize(final Object obj) {
+ return serialize(obj, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Serializes given object to byte array using Kryo instance in global pool.
+ *
+ * @param obj Object to serialize (Class must be registered to KryoFactory)
+ * @param bufferSize maximum size of serialized bytes
+ * @return serialized bytes
+ */
+ public static byte[] serialize(final Object obj, final int bufferSize) {
+ Output out = new Output(bufferSize);
+ Kryo kryo = GLOBAL_POOL.newKryo();
+ try {
+ kryo.writeClassAndObject(out, obj);
+ return out.toBytes();
+ } finally {
+ GLOBAL_POOL.deleteKryo(kryo);
+ }
+ }
+
+ /**
+ * Deserializes given byte array to Object using Kryo instance in global pool.
+ *
+ * @param bytes serialized bytes
+ * @param <T> deserialized Object type
+ * @return deserialized Object (Class must be registered to KryoFactory)
+ */
+ public static <T> T deserialize(final byte[] bytes) {
+ Input in = new Input(bytes);
+ Kryo kryo = GLOBAL_POOL.newKryo();
+ try {
+ @SuppressWarnings("unchecked")
+ T obj = (T) kryo.readClassAndObject(in);
+ return obj;
+ } finally {
+ GLOBAL_POOL.deleteKryo(kryo);
+ }
+ }
}