Kryo related fixes

- KryoNamespace to allow control over registration id

Change-Id: Idc2a0e27a09916657c725ee97e4366109144cc66
diff --git a/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java b/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
index b89544e..63d5363 100644
--- a/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
+++ b/utils/misc/src/main/java/org/onlab/util/KryoNamespace.java
@@ -15,10 +15,11 @@
  */
 package org.onlab.util;
 
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -27,15 +28,17 @@
 import com.esotericsoftware.kryo.io.ByteBufferInput;
 import com.esotericsoftware.kryo.io.ByteBufferOutput;
 import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.pool.KryoCallback;
 import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 
-// TODO Add tests for this class.
 /**
  * Pool of Kryo instances, with classes pre-registered.
  */
 //@ThreadSafe
-public final class KryoNamespace implements KryoFactory {
+public final class KryoNamespace implements KryoFactory, KryoPool {
 
     /**
      * Default buffer size used for serialization.
@@ -45,17 +48,35 @@
     public static final int DEFAULT_BUFFER_SIZE = 4096;
     public static final int MAX_BUFFER_SIZE = 100 * 1000 * 1000;
 
-    private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
-    private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
+    /**
+     * ID to use if this KryoNamespace does not define registration id.
+     */
+    public static final int FLOATING_ID = -1;
+
+    /**
+     * Smallest ID free to use for user defined registrations.
+     */
+    public static final int INITIAL_ID = 11;
+
+
+    private final KryoPool pool = new KryoPool.Builder(this)
+                                        .softReferences()
+                                        .build();
+
+    private final ImmutableList<RegistrationBlock> registeredBlocks;
+
     private final boolean registrationRequired;
 
+
     /**
      * KryoNamespace builder.
      */
     //@NotThreadSafe
     public static final class Builder {
 
-        private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+        private int blockHeadId = INITIAL_ID;
+        private List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+        private List<RegistrationBlock> blocks = new ArrayList<>();
         private boolean registrationRequired = true;
 
         /**
@@ -64,7 +85,27 @@
          * @return KryoNamespace
          */
         public KryoNamespace build() {
-            return new KryoNamespace(types, registrationRequired);
+            if (!types.isEmpty()) {
+                blocks.add(new RegistrationBlock(this.blockHeadId, types));
+            }
+            return new KryoNamespace(blocks, registrationRequired).populate(1);
+        }
+
+        /**
+         * Sets the next Kryo registration Id for following register entries.
+         *
+         * @param id Kryo registration Id
+         * @return this
+         *
+         * @see Kryo#register(Class, Serializer, int)
+         */
+        public Builder nextId(final int id) {
+            if (!types.isEmpty()) {
+                blocks.add(new RegistrationBlock(this.blockHeadId, types));
+                types = new ArrayList<>();
+            }
+            this.blockHeadId = id;
+            return this;
         }
 
         /**
@@ -75,7 +116,7 @@
          */
         public Builder register(final Class<?>... expectedTypes) {
             for (Class<?> clazz : expectedTypes) {
-                types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
+                types.add(Pair.of(clazz, null));
             }
             return this;
         }
@@ -83,26 +124,54 @@
         /**
          * Registers a class and it's serializer.
          *
-         * @param clazz the class to register
+         * @param classes list of classes to register
          * @param serializer serializer to use for the class
          * @return this
          */
-        public Builder register(final Class<?> clazz, Serializer<?> serializer) {
-            types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
+        public Builder register(Serializer<?> serializer, final Class<?>... classes) {
+            for (Class<?> clazz : classes) {
+                types.add(Pair.of(clazz, serializer));
+            }
+            return this;
+        }
+
+        private Builder register(RegistrationBlock block) {
+            if (block.begin() != FLOATING_ID) {
+                // flush pending types
+                nextId(block.begin());
+                blocks.add(block);
+                nextId(block.begin() + block.types().size());
+            } else {
+                // flush pending types
+                final int addedBlockBegin = blockHeadId + types.size();
+                nextId(addedBlockBegin);
+                blocks.add(new RegistrationBlock(addedBlockBegin, block.types()));
+                nextId(addedBlockBegin + block.types().size());
+            }
             return this;
         }
 
         /**
          * Registers all the class registered to given KryoNamespace.
          *
-         * @param pool KryoNamespace
+         * @param ns KryoNamespace
          * @return this
          */
-        public Builder register(final KryoNamespace pool) {
-            types.addAll(pool.registeredTypes);
+        public Builder register(final KryoNamespace ns) {
+            for (RegistrationBlock block : ns.registeredBlocks) {
+                this.register(block);
+            }
             return this;
         }
 
+        /**
+         * Sets the registrationRequired flag.
+         *
+         * @param registrationRequired Kryo's registrationRequired flag
+         * @return this
+         *
+         * @see Kryo#setRegistrationRequired(boolean)
+         */
         public Builder setRegistrationRequired(boolean registrationRequired) {
             this.registrationRequired = registrationRequired;
             return this;
@@ -124,8 +193,8 @@
      * @param registeredTypes types to register
      * @param registrationRequired
      */
-    private KryoNamespace(final List<Pair<Class<?>, Serializer<?>>> registeredTypes, boolean registrationRequired) {
-        this.registeredTypes = ImmutableList.copyOf(registeredTypes);
+    private KryoNamespace(final List<RegistrationBlock> registeredTypes, boolean registrationRequired) {
+        this.registeredBlocks = ImmutableList.copyOf(registeredTypes);
         this.registrationRequired = registrationRequired;
     }
 
@@ -136,39 +205,14 @@
      * @return this
      */
     public KryoNamespace populate(int instances) {
-        List<Kryo> kryos = new ArrayList<>(instances);
+
         for (int i = 0; i < instances; ++i) {
-            kryos.add(create());
+            release(create());
         }
-        pool.addAll(kryos);
         return this;
     }
 
     /**
-     * Gets a Kryo instance from the pool.
-     *
-     * @return Kryo instance
-     */
-    public Kryo getKryo() {
-        Kryo kryo = pool.poll();
-        if (kryo == null) {
-            return create();
-        }
-        return kryo;
-    }
-
-    /**
-     * Returns a Kryo instance to the pool.
-     *
-     * @param kryo instance obtained from this pool.
-     */
-    public void putKryo(Kryo kryo) {
-        if (kryo != null) {
-            pool.add(kryo);
-        }
-    }
-
-    /**
      * Serializes given object to byte array using Kryo instance in pool.
      * <p>
      * Note: Serialized bytes must be smaller than {@link #MAX_BUFFER_SIZE}.
@@ -189,13 +233,13 @@
      */
     public byte[] serialize(final Object obj, final int bufferSize) {
         ByteBufferOutput out = new ByteBufferOutput(bufferSize, MAX_BUFFER_SIZE);
-        Kryo kryo = getKryo();
+        Kryo kryo = borrow();
         try {
             kryo.writeClassAndObject(out, obj);
             out.flush();
             return out.toBytes();
         } finally {
-            putKryo(kryo);
+            release(kryo);
         }
     }
 
@@ -207,12 +251,40 @@
      */
     public void serialize(final Object obj, final ByteBuffer buffer) {
         ByteBufferOutput out = new ByteBufferOutput(buffer);
-        Kryo kryo = getKryo();
+        Kryo kryo = borrow();
         try {
             kryo.writeClassAndObject(out, obj);
             out.flush();
         } finally {
-            putKryo(kryo);
+            release(kryo);
+        }
+    }
+
+    /**
+     * Serializes given object to OutputStream using Kryo instance in pool.
+     *
+     * @param obj Object to serialize
+     * @param stream to write to
+     */
+    public void serialize(final Object obj, final OutputStream stream) {
+        serialize(obj, stream, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Serializes given object to OutputStream using Kryo instance in pool.
+     *
+     * @param obj Object to serialize
+     * @param stream to write to
+     * @param bufferSize size of the buffer in front of the stream
+     */
+    public void serialize(final Object obj, final OutputStream stream, final int bufferSize) {
+        ByteBufferOutput out = new ByteBufferOutput(stream, bufferSize);
+        Kryo kryo = borrow();
+        try {
+            kryo.writeClassAndObject(out, obj);
+            out.flush();
+        } finally {
+            release(kryo);
         }
     }
 
@@ -225,13 +297,13 @@
      */
     public <T> T deserialize(final byte[] bytes) {
         Input in = new Input(bytes);
-        Kryo kryo = getKryo();
+        Kryo kryo = borrow();
         try {
             @SuppressWarnings("unchecked")
             T obj = (T) kryo.readClassAndObject(in);
             return obj;
         } finally {
-            putKryo(kryo);
+            release(kryo);
         }
     }
 
@@ -244,18 +316,49 @@
      */
     public <T> T deserialize(final ByteBuffer buffer) {
         ByteBufferInput in = new ByteBufferInput(buffer);
-        Kryo kryo = getKryo();
+        Kryo kryo = borrow();
         try {
             @SuppressWarnings("unchecked")
             T obj = (T) kryo.readClassAndObject(in);
             return obj;
         } finally {
-            putKryo(kryo);
+            release(kryo);
         }
     }
 
     /**
-     * Creates a Kryo instance with {@link #registeredTypes} pre-registered.
+     * Deserializes given InputStream to an Object using Kryo instance in pool.
+     *
+     * @param stream input stream
+     * @param <T> deserialized Object type
+     * @return deserialized Object
+     */
+    public <T> T deserialize(final InputStream stream) {
+        return deserialize(stream, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Deserializes given InputStream to an Object using Kryo instance in pool.
+     *
+     * @param stream input stream
+     * @param <T> deserialized Object type
+     * @return deserialized Object
+     * @param bufferSize size of the buffer in front of the stream
+     */
+    public <T> T deserialize(final InputStream stream, final int bufferSize) {
+        ByteBufferInput in = new ByteBufferInput(stream, bufferSize);
+        Kryo kryo = borrow();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(in);
+            return obj;
+        } finally {
+            release(kryo);
+        }
+    }
+
+    /**
+     * Creates a Kryo instance.
      *
      * @return Kryo instance
      */
@@ -263,42 +366,68 @@
     public Kryo create() {
         Kryo kryo = new Kryo();
         kryo.setRegistrationRequired(registrationRequired);
-        for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
-            final Serializer<?> serializer = registry.getRight();
-            if (serializer == null) {
-                kryo.register(registry.getLeft());
-            } else {
-                kryo.register(registry.getLeft(), serializer);
-                if (serializer instanceof FamilySerializer) {
-                    FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
-                    fser.registerFamilies(kryo);
+        for (RegistrationBlock block : registeredBlocks) {
+            int id = block.begin();
+            if (id == FLOATING_ID) {
+                id = kryo.getNextRegistrationId();
+            }
+            for (Pair<Class<?>, Serializer<?>> entry : block.types()) {
+                final Serializer<?> serializer = entry.getRight();
+                if (serializer == null) {
+                    kryo.register(entry.getLeft(), id++);
+                } else {
+                    kryo.register(entry.getLeft(), serializer, id++);
                 }
             }
         }
         return kryo;
     }
 
-    /**
-     * Serializer implementation, which required registration of family of Classes.
-     * @param <T> base type of this serializer.
-     */
-    public abstract static class FamilySerializer<T> extends Serializer<T> {
+    @Override
+    public Kryo borrow() {
+        return pool.borrow();
+    }
 
+    @Override
+    public void release(Kryo kryo) {
+        pool.release(kryo);
+    }
 
-        public FamilySerializer(boolean acceptsNull) {
-            super(acceptsNull);
+    @Override
+    public <T> T run(KryoCallback<T> callback) {
+        return pool.run(callback);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                    .add("registeredBlocks", registeredBlocks)
+                    .toString();
+    }
+
+    static final class RegistrationBlock {
+        private final int begin;
+        private final ImmutableList<Pair<Class<?>, Serializer<?>>> types;
+
+        public RegistrationBlock(int begin, List<Pair<Class<?>, Serializer<?>>> types) {
+            this.begin = begin;
+            this.types = ImmutableList.copyOf(types);
         }
 
-        public FamilySerializer(boolean acceptsNull, boolean immutable) {
-            super(acceptsNull, immutable);
+        public int begin() {
+            return begin;
         }
 
-        /**
-         * Registers other classes this Serializer supports.
-         *
-         * @param kryo instance to register classes to
-         */
-        public void registerFamilies(Kryo kryo) {
+        public ImmutableList<Pair<Class<?>, Serializer<?>>> types() {
+            return types;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("begin", begin)
+                    .add("types", types)
+                    .toString();
         }
     }
 }