Make KryoFactory thread safe.

- Added static utility method to serialize/deserialze Object.

Change-Id: I3cad1cb957a16fab4aa087e958262f4d0a5c4b67
diff --git a/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java b/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
index 0e280ee..3322ce3 100644
--- a/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
+++ b/src/main/java/net/onrc/onos/core/intent/runtime/PersistIntent.java
@@ -49,8 +49,7 @@
         table = DataStoreClient.getClient().getTable(INTENT_JOURNAL);
         stream = new ByteArrayOutputStream(1024);
         output = new Output(stream);
-        // FIXME Using KryoFactory only to register classes and not using the pool.
-        kryo = new KryoFactory(1).newKryo();
+        kryo = KryoFactory.newKryoObject();
     }
 
     private long getNextBlock() {
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index 69c0b1a..4341e4a 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -7,7 +7,11 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.concurrent.ThreadSafe;
 
 import net.floodlightcontroller.core.IFloodlightProviderService.Role;
 import net.floodlightcontroller.util.MACAddress;
@@ -64,14 +68,27 @@
 import net.onrc.onos.core.util.SwitchPort;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 
 /**
  * Class factory for allocating Kryo instances for
  * serialization/deserialization of classes.
  */
+@ThreadSafe
 public class KryoFactory {
+
     private static final int DEFAULT_PREALLOCATIONS = 100;
-    private ArrayList<Kryo> kryoList = new ArrayList<Kryo>();
+    /**
+     * Default buffer size used for serialization.
+     *
+     * @see #serialize(Object)
+     */
+    public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
+
+    private static final KryoFactory GLOBAL_POOL = new KryoFactory();
+
+    private ConcurrentLinkedQueue<Kryo> kryoList = new ConcurrentLinkedQueue<Kryo>();
 
     /**
      * Default constructor.
@@ -89,7 +106,6 @@
      */
     public KryoFactory(final int initialCapacity) {
         // Preallocate
-        kryoList.ensureCapacity(initialCapacity);
         for (int i = 0; i < initialCapacity; i++) {
             Kryo kryo = newKryoObject();
             kryoList.add(kryo);
@@ -102,7 +118,15 @@
      * @return the Kryo object.
      */
     public Kryo newKryo() {
-        return newDeleteKryo(null);
+        Kryo kryo = kryoList.poll();
+        if (kryo == null) {
+            // Will defer additional allocation until deleteKryo().
+            // It is more likely that it is no longer latency sensitive
+            // by the time caller is recycling Kryo instance.
+            return newKryoObject();
+        } else {
+            return kryo;
+        }
     }
 
     /**
@@ -110,35 +134,18 @@
      *
      * @param deleteKryo the object to delete.
      */
-    public void deleteKryo(Kryo deleteKryo) {
-        newDeleteKryo(deleteKryo);
-    }
-
-    /**
-     * Creates or deletes a Kryo object.
-     *
-     * @param deleteKryo if null, then allocate and return a new object,
-     *                   otherwise delete the provided object.
-     * @return a new Kryo object if needed, otherwise null.
-     */
-    private synchronized Kryo newDeleteKryo(Kryo deleteKryo) {
-        if (deleteKryo != null) {
-            // Delete an entry by moving it back to the buffer
-            kryoList.add(deleteKryo);
-            return null;
-        } else {
-            Kryo kryo = null;
-            if (kryoList.isEmpty()) {
-                // Preallocate
-                for (int i = 0; i < 100; i++) {
-                    kryo = newKryoObject();
-                    kryoList.add(kryo);
-                }
+    public void deleteKryo(final Kryo deleteKryo) {
+        if (kryoList.isEmpty()) {
+            // buffer extra if kryo instance pool has exhausted.
+            List<Kryo> kryos = new ArrayList<>(DEFAULT_PREALLOCATIONS);
+            for (int i = 0; i < DEFAULT_PREALLOCATIONS; ++i) {
+                kryos.add(newKryoObject());
             }
-
-            kryo = kryoList.remove(kryoList.size() - 1);
-            return kryo;
+            kryoList.addAll(kryos);
         }
+
+        // recycle
+        kryoList.add(deleteKryo);
     }
 
     /**
@@ -251,4 +258,53 @@
 
         return kryo;
     }
+
+    /**
+     * Serializes given object to byte array using Kryo instance in global pool.
+     * <p>
+     * Note: Serialized bytes must be smaller than DEFAULT_BUFFER_SIZE (=1MB).
+     *
+     * @param obj Object to serialize (Class must be registered to KryoFactory)
+     * @return serialized bytes
+     */
+    public static byte[] serialize(final Object obj) {
+        return serialize(obj, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Serializes given object to byte array using Kryo instance in global pool.
+     *
+     * @param obj Object to serialize (Class must be registered to KryoFactory)
+     * @param bufferSize maximum size of serialized bytes
+     * @return serialized bytes
+     */
+    public static byte[] serialize(final Object obj, final int bufferSize) {
+        Output out = new Output(bufferSize);
+        Kryo kryo = GLOBAL_POOL.newKryo();
+        try {
+            kryo.writeClassAndObject(out, obj);
+            return out.toBytes();
+        } finally {
+            GLOBAL_POOL.deleteKryo(kryo);
+        }
+    }
+
+    /**
+     * Deserializes given byte array to Object using Kryo instance in global pool.
+     *
+     * @param bytes serialized bytes
+     * @param <T> deserialized Object type
+     * @return deserialized Object (Class must be registered to KryoFactory)
+     */
+    public static <T> T deserialize(final byte[] bytes) {
+        Input in = new Input(bytes);
+        Kryo kryo = GLOBAL_POOL.newKryo();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(in);
+            return obj;
+        } finally {
+            GLOBAL_POOL.deleteKryo(kryo);
+        }
+    }
 }
diff --git a/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java b/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
index f5e196d..38a37a0 100644
--- a/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
+++ b/src/test/java/net/onrc/onos/core/util/serializers/KryoFactoryTest.java
@@ -6,7 +6,9 @@
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import net.floodlightcontroller.util.MACAddress;
 import net.onrc.onos.core.topology.HostEvent;
@@ -27,6 +29,7 @@
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Sets;
 
 /**
  * Tests to capture Kryo serialization characteristics.
@@ -59,6 +62,87 @@
     }
 
     /**
+     * Test case to check recycling behavior of KryoFactory.
+     */
+    @Test
+    public void testReallocation() {
+        final int poolSize = 3;
+        KryoFactory pool = new KryoFactory(poolSize);
+
+
+        // getting Kryo instance smaller than pool size should work just fine
+        Set<Kryo> kryos = new HashSet<>();
+        for (int i = 0; i < poolSize - 1; ++i) {
+            Kryo kryo = pool.newKryo();
+            assertNotNull(kryo);
+            assertTrue("KryoFactory should return unique instances",
+                        kryos.add(kryo));
+        }
+
+        // recycle Kryo instance
+        for (Kryo kryo : kryos) {
+            pool.deleteKryo(kryo);
+        }
+
+
+        // recycling behavior check
+        Set<Kryo> kryos2 = new HashSet<>();
+        for (int i = 0; i < poolSize - 1; ++i) {
+            Kryo kryo = pool.newKryo();
+            assertNotNull(kryo);
+            assertTrue("KryoFactory should return unique instances",
+                        kryos2.add(kryo));
+        }
+        // should at least have some recycled instances
+        assertTrue("Kryo instances should be reused after deleting",
+                    !Sets.difference(kryos2, kryos).isEmpty());
+
+        for (Kryo kryo : kryos2) {
+            pool.deleteKryo(kryo);
+        }
+
+
+        // pool expansion behavior check
+        Set<Kryo> kryos3 = new HashSet<>();
+        // it should be able to get Kryo instances larger than pool size set
+        for (int i = 0; i < poolSize * 2; ++i) {
+            Kryo kryo = pool.newKryo();
+            assertNotNull(kryo);
+            assertTrue("KryoFactory should return unique instances",
+                        kryos3.add(kryo));
+        }
+        // recycle Kryo instance (should trigger pool expansion)
+        for (Kryo kryo : kryos3) {
+            pool.deleteKryo(kryo);
+        }
+
+        // should at least have some Kryo instances we haven't seen initially.
+        assertTrue("New Kryo instances should be added to the pool",
+                !Sets.difference(kryos3, kryos).isEmpty());
+    }
+
+    /**
+     * Tests static serialize/deserialize methods.
+     */
+    @Test
+    public void testStaticSerializeDeserialize() {
+
+        final List<Object> objects = new ArrayList<>();
+        Dpid dpid1 = new Dpid(1);
+        PortNumber port10 = PortNumber.uint32(10);
+        SwitchPort switchPort = new SwitchPort(dpid1, port10);
+
+        objects.add(dpid1);
+        objects.add(port10);
+        objects.add(switchPort);
+
+        final byte[] bytes = KryoFactory.serialize(objects);
+        final List<Object> deserialized = KryoFactory.deserialize(bytes);
+
+        assertEquals(objects, deserialized);
+    }
+
+    /**
      * Benchmark result.
      */
     private static final class Result {