Make KryoFactory thread safe.

- Added static utility method to serialize/deserialze Object.

Change-Id: I3cad1cb957a16fab4aa087e958262f4d0a5c4b67
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index 69c0b1a..4341e4a 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -7,7 +7,11 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.concurrent.ThreadSafe;
 
 import net.floodlightcontroller.core.IFloodlightProviderService.Role;
 import net.floodlightcontroller.util.MACAddress;
@@ -64,14 +68,27 @@
 import net.onrc.onos.core.util.SwitchPort;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 
 /**
  * Class factory for allocating Kryo instances for
  * serialization/deserialization of classes.
  */
+@ThreadSafe
 public class KryoFactory {
+
     private static final int DEFAULT_PREALLOCATIONS = 100;
-    private ArrayList<Kryo> kryoList = new ArrayList<Kryo>();
+    /**
+     * Default buffer size used for serialization.
+     *
+     * @see #serialize(Object)
+     */
+    public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
+
+    private static final KryoFactory GLOBAL_POOL = new KryoFactory();
+
+    private ConcurrentLinkedQueue<Kryo> kryoList = new ConcurrentLinkedQueue<Kryo>();
 
     /**
      * Default constructor.
@@ -89,7 +106,6 @@
      */
     public KryoFactory(final int initialCapacity) {
         // Preallocate
-        kryoList.ensureCapacity(initialCapacity);
         for (int i = 0; i < initialCapacity; i++) {
             Kryo kryo = newKryoObject();
             kryoList.add(kryo);
@@ -102,7 +118,15 @@
      * @return the Kryo object.
      */
     public Kryo newKryo() {
-        return newDeleteKryo(null);
+        Kryo kryo = kryoList.poll();
+        if (kryo == null) {
+            // Will defer additional allocation until deleteKryo().
+            // It is more likely that it is no longer latency sensitive
+            // by the time caller is recycling Kryo instance.
+            return newKryoObject();
+        } else {
+            return kryo;
+        }
     }
 
     /**
@@ -110,35 +134,18 @@
      *
      * @param deleteKryo the object to delete.
      */
-    public void deleteKryo(Kryo deleteKryo) {
-        newDeleteKryo(deleteKryo);
-    }
-
-    /**
-     * Creates or deletes a Kryo object.
-     *
-     * @param deleteKryo if null, then allocate and return a new object,
-     *                   otherwise delete the provided object.
-     * @return a new Kryo object if needed, otherwise null.
-     */
-    private synchronized Kryo newDeleteKryo(Kryo deleteKryo) {
-        if (deleteKryo != null) {
-            // Delete an entry by moving it back to the buffer
-            kryoList.add(deleteKryo);
-            return null;
-        } else {
-            Kryo kryo = null;
-            if (kryoList.isEmpty()) {
-                // Preallocate
-                for (int i = 0; i < 100; i++) {
-                    kryo = newKryoObject();
-                    kryoList.add(kryo);
-                }
+    public void deleteKryo(final Kryo deleteKryo) {
+        if (kryoList.isEmpty()) {
+            // buffer extra if kryo instance pool has exhausted.
+            List<Kryo> kryos = new ArrayList<>(DEFAULT_PREALLOCATIONS);
+            for (int i = 0; i < DEFAULT_PREALLOCATIONS; ++i) {
+                kryos.add(newKryoObject());
             }
-
-            kryo = kryoList.remove(kryoList.size() - 1);
-            return kryo;
+            kryoList.addAll(kryos);
         }
+
+        // recycle
+        kryoList.add(deleteKryo);
     }
 
     /**
@@ -251,4 +258,53 @@
 
         return kryo;
     }
+
+    /**
+     * Serializes given object to byte array using Kryo instance in global pool.
+     * <p>
+     * Note: Serialized bytes must be smaller than DEFAULT_BUFFER_SIZE (=1MB).
+     *
+     * @param obj Object to serialize (Class must be registered to KryoFactory)
+     * @return serialized bytes
+     */
+    public static byte[] serialize(final Object obj) {
+        return serialize(obj, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Serializes given object to byte array using Kryo instance in global pool.
+     *
+     * @param obj Object to serialize (Class must be registered to KryoFactory)
+     * @param bufferSize maximum size of serialized bytes
+     * @return serialized bytes
+     */
+    public static byte[] serialize(final Object obj, final int bufferSize) {
+        Output out = new Output(bufferSize);
+        Kryo kryo = GLOBAL_POOL.newKryo();
+        try {
+            kryo.writeClassAndObject(out, obj);
+            return out.toBytes();
+        } finally {
+            GLOBAL_POOL.deleteKryo(kryo);
+        }
+    }
+
+    /**
+     * Deserializes given byte array to Object using Kryo instance in global pool.
+     *
+     * @param bytes serialized bytes
+     * @param <T> deserialized Object type
+     * @return deserialized Object (Class must be registered to KryoFactory)
+     */
+    public static <T> T deserialize(final byte[] bytes) {
+        Input in = new Input(bytes);
+        Kryo kryo = GLOBAL_POOL.newKryo();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(in);
+            return obj;
+        } finally {
+            GLOBAL_POOL.deleteKryo(kryo);
+        }
+    }
 }