Kryo related changes.

Change-Id: I5b4fab63d6ece084b65aa712971a22d953d0caf0
diff --git a/core/api/src/main/java/org/onlab/onos/net/DeviceId.java b/core/api/src/main/java/org/onlab/onos/net/DeviceId.java
index ef8c5ab..8d96d8b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/DeviceId.java
+++ b/core/api/src/main/java/org/onlab/onos/net/DeviceId.java
@@ -7,6 +7,9 @@
  */
 public final class DeviceId extends ElementId {
 
+    // Default constructor for serialization
+    protected DeviceId() {}
+
     // Public construction is prohibited
     private DeviceId(URI uri) {
         super(uri);
diff --git a/core/api/src/main/java/org/onlab/onos/net/ElementId.java b/core/api/src/main/java/org/onlab/onos/net/ElementId.java
index e205bb6..a75f11e 100644
--- a/core/api/src/main/java/org/onlab/onos/net/ElementId.java
+++ b/core/api/src/main/java/org/onlab/onos/net/ElementId.java
@@ -10,6 +10,11 @@
 
     private final URI uri;
 
+    // Default constructor for serialization
+    protected ElementId() {
+        this.uri = null;
+    }
+
     /**
      * Creates an element identifier using the supplied URI.
      *
diff --git a/core/api/src/main/java/org/onlab/onos/net/provider/ProviderId.java b/core/api/src/main/java/org/onlab/onos/net/provider/ProviderId.java
index e9af0eb..5fc0150 100644
--- a/core/api/src/main/java/org/onlab/onos/net/provider/ProviderId.java
+++ b/core/api/src/main/java/org/onlab/onos/net/provider/ProviderId.java
@@ -12,6 +12,12 @@
     private final String scheme;
     private final String id;
 
+    // Default constructor for serialization
+    protected ProviderId() {
+        scheme = null;
+        id = null;
+    }
+
     /**
      * Creates a new provider identifier from the specified string.
      * The providers are expected to follow the reverse DNS convention, e.g.
diff --git a/core/store/pom.xml b/core/store/pom.xml
index cf01c5a..246355c 100644
--- a/core/store/pom.xml
+++ b/core/store/pom.xml
@@ -25,6 +25,14 @@
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.scr.annotations</artifactId>
         </dependency>
+        <dependency>
+          <groupId>com.hazelcast</groupId>
+          <artifactId>hazelcast</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>de.javakaffee</groupId>
+          <artifactId>kryo-serializers</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/core/store/src/main/java/org/onlab/onos/store/device/impl/DefaultPortSerializer.java b/core/store/src/main/java/org/onlab/onos/store/device/impl/DefaultPortSerializer.java
new file mode 100644
index 0000000..4a60eb8
--- /dev/null
+++ b/core/store/src/main/java/org/onlab/onos/store/device/impl/DefaultPortSerializer.java
@@ -0,0 +1,53 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Element;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.packet.IpPrefix;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableSet;
+
+// TODO move to util, etc.
+public final class DefaultPortSerializer extends
+        Serializer<DefaultPort> {
+
+    private final CollectionSerializer ipAddrSerializer
+        = new CollectionSerializer(IpPrefix.class,
+                            new IpPrefixSerializer(), false);
+
+    public DefaultPortSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, DefaultPort object) {
+        kryo.writeClassAndObject(output, object.element());
+        kryo.writeObject(output, object.number());
+        output.writeBoolean(object.isEnabled());
+        kryo.writeObject(output, object.ipAddresses(),
+                ipAddrSerializer);
+    }
+
+    @Override
+    public DefaultPort read(Kryo kryo, Input input,
+            Class<DefaultPort> type) {
+        Element element = (Element) kryo.readClassAndObject(input);
+        PortNumber number = kryo.readObject(input, PortNumber.class);
+        boolean isEnabled = input.readBoolean();
+        @SuppressWarnings("unchecked")
+        Collection<IpPrefix> ipAddresses = kryo.readObject(
+                    input, ArrayList.class, ipAddrSerializer);
+
+        return new DefaultPort(element, number, isEnabled,
+                            ImmutableSet.copyOf(ipAddresses));
+    }
+}
diff --git a/core/store/src/main/java/org/onlab/onos/store/device/impl/IpPrefixSerializer.java b/core/store/src/main/java/org/onlab/onos/store/device/impl/IpPrefixSerializer.java
new file mode 100644
index 0000000..3d3efe0
--- /dev/null
+++ b/core/store/src/main/java/org/onlab/onos/store/device/impl/IpPrefixSerializer.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.packet.IpPrefix;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+// TODO move to util, etc.
+public final class IpPrefixSerializer extends Serializer<IpPrefix> {
+
+    public IpPrefixSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output,
+            IpPrefix object) {
+        byte[] octs = object.toOctets();
+        output.writeInt(octs.length);
+        output.writeBytes(octs);
+        output.writeInt(object.prefixLength());
+    }
+
+    @Override
+    public IpPrefix read(Kryo kryo, Input input,
+            Class<IpPrefix> type) {
+        int octLen = input.readInt();
+        byte[] octs = new byte[octLen];
+        input.read(octs);
+        int prefLen = input.readInt();
+        return IpPrefix.valueOf(octs, prefLen);
+    }
+}
diff --git a/core/store/src/main/java/org/onlab/onos/store/device/impl/PortNumberSerializer.java b/core/store/src/main/java/org/onlab/onos/store/device/impl/PortNumberSerializer.java
new file mode 100644
index 0000000..4483187
--- /dev/null
+++ b/core/store/src/main/java/org/onlab/onos/store/device/impl/PortNumberSerializer.java
@@ -0,0 +1,29 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.PortNumber;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+// TODO move to util, etc.
+public final class PortNumberSerializer extends
+        Serializer<PortNumber> {
+
+    public PortNumberSerializer() {
+        // non-null, immutable
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, PortNumber object) {
+        output.writeLong(object.toLong());
+    }
+
+    @Override
+    public PortNumber read(Kryo kryo, Input input,
+            Class<PortNumber> type) {
+        return PortNumber.portNumber(input.readLong());
+    }
+}
diff --git a/pom.xml b/pom.xml
index 4210599..18dd5b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,28 @@
                 <scope>provided</scope>
             </dependency>
 
+            <dependency>
+              <groupId>com.hazelcast</groupId>
+              <artifactId>hazelcast</artifactId>
+              <version>3.3</version>
+            </dependency>
+            <dependency>
+              <groupId>com.esotericsoftware.kryo</groupId>
+              <artifactId>kryo</artifactId>
+              <version>2.24.0</version>
+            </dependency>
+            <dependency>
+              <groupId>de.javakaffee</groupId>
+              <artifactId>kryo-serializers</artifactId>
+              <version>0.27</version>
+            </dependency>
+
+            <dependency>
+              <groupId>org.apache.commons</groupId>
+              <artifactId>commons-lang3</artifactId>
+              <version>3.3.2</version>
+            </dependency>
+
             <!-- ONOS related -->
             <dependency>
                 <groupId>org.onlab.onos</groupId>
diff --git a/utils/misc/pom.xml b/utils/misc/pom.xml
index 5b19401..b70bbe5 100644
--- a/utils/misc/pom.xml
+++ b/utils/misc/pom.xml
@@ -30,6 +30,14 @@
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>
+        <dependency>
+          <groupId>com.esotericsoftware.kryo</groupId>
+          <artifactId>kryo</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/utils/misc/src/main/java/org/onlab/util/KryoPool.java b/utils/misc/src/main/java/org/onlab/util/KryoPool.java
new file mode 100644
index 0000000..58c268c
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/KryoPool.java
@@ -0,0 +1,213 @@
+package org.onlab.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableList;
+
+// TODO Add tests for this class.
+/**
+ * Pool of Kryo instances, with classes pre-registered.
+ */
+//@ThreadSafe
+public final class KryoPool {
+
+    /**
+     * Default buffer size used for serialization.
+     *
+     * @see #serialize(Object)
+     */
+    public static final int DEFAULT_BUFFER_SIZE = 1 * 1000 * 1000;
+
+    private final ConcurrentLinkedQueue<Kryo> pool = new ConcurrentLinkedQueue<>();
+    private final ImmutableList<Pair<Class<?>, Serializer<?>>> registeredTypes;
+    private final boolean registrationRequired;
+
+    /**
+     * KryoPool builder.
+     */
+    //@NotThreadSafe
+    public static final class Builder {
+
+        private final List<Pair<Class<?>, Serializer<?>>> types = new ArrayList<>();
+
+        /**
+         * Builds a {@link KryoPool} instance.
+         *
+         * @return KryoPool
+         */
+        public KryoPool build() {
+            return new KryoPool(types);
+        }
+
+        /**
+         * Registers classes to be serialized using Kryo default serializer.
+         *
+         * @param expectedTypes list of classes
+         * @return this
+         */
+        public Builder register(final Class<?>... expectedTypes) {
+            for (Class<?> clazz : expectedTypes) {
+                types.add(Pair.<Class<?>, Serializer<?>>of(clazz, null));
+            }
+            return this;
+        }
+
+        /**
+         * Registers a class and it's serializer.
+         *
+         * @param clazz the class to register
+         * @param serializer serializer to use for the class
+         * @return this
+         */
+        public Builder register(final Class<?> clazz, Serializer<?> serializer) {
+            types.add(Pair.<Class<?>, Serializer<?>>of(clazz, serializer));
+            return this;
+        }
+
+        /**
+         * Registers all the class registered to given KryoPool.
+         *
+         * @param pool KryoPool
+         * @return this
+         */
+        public Builder register(final KryoPool pool) {
+            types.addAll(pool.registeredTypes);
+            return this;
+        }
+    }
+
+    /**
+     * Creates a new {@link KryoPool} builder.
+     *
+     * @return builder
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /**
+     * Creates a Kryo instance pool.
+     *
+     * @param registerdTypes types to register
+     */
+    private KryoPool(final List<Pair<Class<?>, Serializer<?>>> registerdTypes) {
+        this.registeredTypes = ImmutableList.copyOf(registerdTypes);
+        // always true for now
+        this.registrationRequired = true;
+    }
+
+    /**
+     * Populates the Kryo pool.
+     *
+     * @param instances to add to the pool
+     * @return this
+     */
+    public KryoPool populate(int instances) {
+        List<Kryo> kryos = new ArrayList<>(instances);
+        for (int i = 0; i < instances; ++i) {
+            kryos.add(newKryoInstance());
+        }
+        pool.addAll(kryos);
+        return this;
+    }
+
+    /**
+     * Gets a Kryo instance from the pool.
+     *
+     * @return Kryo instance
+     */
+    public Kryo getKryo() {
+        Kryo kryo = pool.poll();
+        if (kryo == null) {
+            return newKryoInstance();
+        }
+        return kryo;
+    }
+
+    /**
+     * Returns a Kryo instance to the pool.
+     *
+     * @param kryo instance obtained from this pool.
+     */
+    public void putKryo(Kryo kryo) {
+        if (kryo != null) {
+            pool.add(kryo);
+        }
+    }
+
+    /**
+     * Serializes given object to byte array using Kryo instance in pool.
+     * <p>
+     * Note: Serialized bytes must be smaller than {@link #DEFAULT_BUFFER_SIZE}.
+     *
+     * @param obj Object to serialize
+     * @return serialized bytes
+     */
+    public byte[] serialize(final Object obj) {
+        return serialize(obj, DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Serializes given object to byte array using Kryo instance in pool.
+     *
+     * @param obj Object to serialize
+     * @param bufferSize maximum size of serialized bytes
+     * @return serialized bytes
+     */
+    public byte[] serialize(final Object obj, final int bufferSize) {
+        Output out = new Output(bufferSize);
+        Kryo kryo = getKryo();
+        try {
+            kryo.writeClassAndObject(out, obj);
+            return out.toBytes();
+        } finally {
+            putKryo(kryo);
+        }
+    }
+
+    /**
+     * Deserializes given byte array to Object using Kryo instance in pool.
+     *
+     * @param bytes serialized bytes
+     * @param <T> deserialized Object type
+     * @return deserialized Object
+     */
+    public <T> T deserialize(final byte[] bytes) {
+        Input in = new Input(bytes);
+        Kryo kryo = getKryo();
+        try {
+            @SuppressWarnings("unchecked")
+            T obj = (T) kryo.readClassAndObject(in);
+            return obj;
+        } finally {
+            putKryo(kryo);
+        }
+    }
+
+
+    /**
+     * Creates a Kryo instance with {@link #registeredTypes} pre-registered.
+     *
+     * @return Kryo instance
+     */
+    private Kryo newKryoInstance() {
+        Kryo kryo = new Kryo();
+        kryo.setRegistrationRequired(registrationRequired);
+        for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
+            if (registry.getRight() == null) {
+                kryo.register(registry.getLeft());
+            } else {
+                kryo.register(registry.getLeft(), registry.getRight());
+            }
+        }
+        return kryo;
+    }
+}