Implement compact serialization for a set of discrete resources

This is for ONOS-4281.

Change-Id: I08a9fc4fd334c499c7a09d2960145743a798094e
diff --git a/core/api/src/main/java/org/onosproject/net/resource/DiscreteResourceSet.java b/core/api/src/main/java/org/onosproject/net/resource/DiscreteResourceSet.java
new file mode 100644
index 0000000..37a3f0f
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/resource/DiscreteResourceSet.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.resource;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Represents a set of discrete type resources.
+ * This class is intended to be used by ConsistentResourceStore though it is exposed to the public.
+ */
+@Beta
+public final class DiscreteResourceSet {
+    private final Set<DiscreteResource> values;
+    private final DiscreteResourceCodec codec;
+
+    /**
+     * Creates an instance with resources and the codec for them.
+     *
+     * @param values resources to be contained in the instance
+     * @param codec codec for the specified resources
+     * @return an instance
+     */
+    public static DiscreteResourceSet of(Set<DiscreteResource> values, DiscreteResourceCodec codec) {
+        checkNotNull(values);
+        checkNotNull(codec);
+        checkArgument(!values.isEmpty());
+
+        return new DiscreteResourceSet(ImmutableSet.copyOf(values), codec);
+    }
+
+    /**
+     * Creates the instance representing an empty resource set.
+     *
+     * @return an empty resource set
+     */
+    public static DiscreteResourceSet empty() {
+        return new DiscreteResourceSet(ImmutableSet.of(), NoOpCodec.INSTANCE);
+    }
+
+    private DiscreteResourceSet(Set<DiscreteResource> values, DiscreteResourceCodec codec) {
+        this.values = values;
+        this.codec = codec;
+    }
+
+    private DiscreteResourceSet() {
+        this.values = null;
+        this.codec = null;
+    }
+
+    /**
+     * Returns resources contained in this instance.
+     *
+     * @return resources
+     */
+    public Set<DiscreteResource> values() {
+        return values;
+    }
+
+    /**
+     * Returns the parent resource of the resources contained in this instance.
+     *
+     * @return the parent resource of the resources
+     */
+    public DiscreteResourceId parent() {
+        if (values.isEmpty()) {
+            // Dummy value avoiding null
+            return ResourceId.ROOT;
+        }
+        Optional<DiscreteResourceId> parent = values.iterator().next().id().parent();
+        checkState(parent.isPresent());
+
+        return parent.get();
+    }
+
+    /**
+     * Returns the codec for the resources contained in this instance.
+     *
+     * @return the codec for the resources
+     */
+    public DiscreteResourceCodec codec() {
+        return codec;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(values, codec);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        final DiscreteResourceSet other = (DiscreteResourceSet) obj;
+        return Objects.equals(this.values, other.values)
+                && Objects.equals(this.codec, other.codec);
+    }
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/resource/DiscreteResourceSetSerializer.java b/core/api/src/main/java/org/onosproject/net/resource/DiscreteResourceSetSerializer.java
new file mode 100644
index 0000000..ee81211
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/resource/DiscreteResourceSetSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.resource;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.Beta;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeSet;
+import org.onlab.util.ClosedOpenRange;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Kryo serializer for {@link DiscreteResourceSet}.
+ */
+@Beta
+public final class DiscreteResourceSetSerializer extends Serializer<DiscreteResourceSet> {
+
+    public DiscreteResourceSetSerializer() {
+        super(false, true);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, DiscreteResourceSet object) {
+        TreeRangeSet<Integer> rangeSet = TreeRangeSet.create();
+        object.values().stream()
+                .map(x -> object.codec().encode(x))
+                .map(Range::singleton)
+                .map(x -> x.canonical(DiscreteDomain.integers()))
+                .forEach(rangeSet::add);
+        List<ClosedOpenRange> ranges = rangeSet.asRanges().stream()
+                .map(ClosedOpenRange::of)
+                .collect(Collectors.toList());
+        kryo.writeObject(output, ranges);
+        kryo.writeClassAndObject(output, object.codec());
+        kryo.writeObject(output, object.parent());
+    }
+
+    @Override
+    public DiscreteResourceSet read(Kryo kryo, Input input, Class<DiscreteResourceSet> type) {
+        @SuppressWarnings("unchecked")
+        List<ClosedOpenRange> ranges = kryo.readObject(input, ArrayList.class);
+        DiscreteResourceCodec codec = (DiscreteResourceCodec) kryo.readClassAndObject(input);
+        DiscreteResourceId parent = kryo.readObject(input, DiscreteResourceId.class);
+
+        if (ranges.isEmpty()) {
+            return DiscreteResourceSet.empty();
+        }
+
+        Set<DiscreteResource> resources = ranges.stream()
+                .flatMapToInt(x -> IntStream.range(x.lowerBound(), x.upperBound()))
+                .mapToObj(x -> codec.decode(parent, x))
+                .collect(Collectors.toSet());
+
+        return DiscreteResourceSet.of(resources, codec);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/resource/NoOpCodec.java b/core/api/src/main/java/org/onosproject/net/resource/NoOpCodec.java
new file mode 100644
index 0000000..942848b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/resource/NoOpCodec.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.resource;
+
+/**
+ * Represents no-op codec intended to used in an empty discrete resource set only.
+ * It's not supposed to be used by other classes.
+ */
+public class NoOpCodec implements DiscreteResourceCodec {
+    public static final DiscreteResourceCodec INSTANCE = new NoOpCodec();
+
+    @Override
+    public int encode(DiscreteResource resource) {
+        return 0;
+    }
+
+    @Override
+    public DiscreteResource decode(DiscreteResourceId parent, int value) {
+        return Resource.ROOT;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return NoOpCodec.class.hashCode();
+    }
+}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 515f9fc..63c8f7d 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -31,6 +31,7 @@
 import org.onlab.packet.TpPort;
 import org.onlab.packet.VlanId;
 import org.onlab.util.Bandwidth;
+import org.onlab.util.ClosedOpenRange;
 import org.onlab.util.Frequency;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
@@ -191,12 +192,18 @@
 import org.onosproject.net.resource.ContinuousResource;
 import org.onosproject.net.resource.ContinuousResourceId;
 import org.onosproject.net.resource.DiscreteResource;
+import org.onosproject.net.resource.DiscreteResourceCodec;
 import org.onosproject.net.resource.DiscreteResourceId;
+import org.onosproject.net.resource.DiscreteResourceSet;
+import org.onosproject.net.resource.DiscreteResourceSetSerializer;
+import org.onosproject.net.resource.MplsCodec;
+import org.onosproject.net.resource.NoOpCodec;
 import org.onosproject.net.resource.ResourceAllocation;
 import org.onosproject.net.packet.DefaultOutboundPacket;
 import org.onosproject.net.packet.DefaultPacketRequest;
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.resource.VlanCodec;
 import org.onosproject.security.Permission;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.primitives.MapUpdate;
@@ -521,7 +528,12 @@
                     org.onlab.packet.MplsLabel.class,
                     org.onlab.packet.MPLS.class
             )
-
+            .register(ClosedOpenRange.class)
+            .register(new DiscreteResourceSetSerializer(), DiscreteResourceSet.class)
+            .register(DiscreteResourceCodec.class)
+            .register(VlanCodec.class)
+            .register(MplsCodec.class)
+            .register(NoOpCodec.class)
             .build();
 
 
diff --git a/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java b/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java
index b159422..883892c 100644
--- a/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java
+++ b/core/store/serializers/src/test/java/org/onosproject/store/serializers/KryoSerializerTest.java
@@ -24,6 +24,7 @@
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onlab.packet.MplsLabel;
 import org.onlab.packet.VlanId;
 import org.onlab.util.Bandwidth;
 import org.onlab.util.Frequency;
@@ -62,6 +63,9 @@
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
 import org.onosproject.net.intent.IntentId;
+import org.onosproject.net.resource.DiscreteResource;
+import org.onosproject.net.resource.DiscreteResourceSet;
+import org.onosproject.net.resource.MplsCodec;
 import org.onosproject.net.resource.ResourceAllocation;
 import org.onosproject.net.resource.Resources;
 import org.onosproject.net.provider.ProviderId;
@@ -80,11 +84,15 @@
 import org.onlab.packet.Ip6Prefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.net.resource.VlanCodec;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.time.Duration;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.*;
@@ -356,6 +364,38 @@
     }
 
     @Test
+    public void testVlanIdResourceSet() {
+        DiscreteResource port = Resources.discrete(DID1, P1).resource();
+
+        Set<DiscreteResource> vlans = IntStream.range(0, 4096)
+                .mapToObj(x -> VlanId.vlanId((short) x))
+                .map(x -> Resources.discrete(port.id(), x).resource())
+                .collect(Collectors.toSet());
+
+        DiscreteResourceSet sut = DiscreteResourceSet.of(vlans, new VlanCodec());
+        testSerializedEquals(sut);
+    }
+
+    @Test
+    public void testMplsLabelResourceSet() {
+        DiscreteResource port = Resources.discrete(DID1, P1).resource();
+
+        Set<DiscreteResource> labels = IntStream.range(0, 1024 * 1024)
+                .mapToObj(MplsLabel::mplsLabel)
+                .map(x -> Resources.discrete(port.id(), x).resource())
+                .collect(Collectors.toSet());
+
+        DiscreteResourceSet sut = DiscreteResourceSet.of(labels, new MplsCodec());
+        testSerializedEquals(sut);
+    }
+
+    @Test
+    public void testEmptyResourceSet() {
+        DiscreteResourceSet sut = DiscreteResourceSet.empty();
+        testSerializedEquals(sut);
+    }
+
+    @Test
     public void testResourceId() {
         testSerializedEquals(Resources.discrete(DID1, P1).id());
     }
diff --git a/utils/misc/src/main/java/org/onlab/util/ClosedOpenRange.java b/utils/misc/src/main/java/org/onlab/util/ClosedOpenRange.java
new file mode 100644
index 0000000..81dd07c
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/ClosedOpenRange.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Range;
+
+import java.util.Objects;
+
+/**
+ * Represent a closed-open range.
+ * The primary user of this class is the ResourceService implementation.
+ */
+@Beta
+public final class ClosedOpenRange {
+    private final int lowerBound;   // inclusive
+    private final int upperBound;   // exclusive
+
+    /**
+     * Creates a range from a Guava's range.
+     *
+     * @param range Guava's range
+     * @return this range
+     */
+    public static ClosedOpenRange of(Range<Integer> range) {
+        return new ClosedOpenRange(
+                range.canonical(DiscreteDomain.integers()).lowerEndpoint(),
+                range.canonical(DiscreteDomain.integers()).upperEndpoint());
+    }
+
+    /**
+     * Create a range with a lower bound and an upper bound.
+     *
+     * @param lowerBound lower bound (inclusive)
+     * @param upperBound upper bound (exclusive)
+     * @return this range
+     */
+    public static ClosedOpenRange of(int lowerBound, int upperBound) {
+        return new ClosedOpenRange(lowerBound, upperBound);
+    }
+
+    private ClosedOpenRange(int lowerBound, int upperBound) {
+        this.lowerBound = lowerBound;
+        this.upperBound = upperBound;
+    }
+
+    /**
+     * Returns the lower bound.
+     *
+     * @return the lower bound
+     */
+    public int lowerBound() {
+        return lowerBound;
+    }
+
+    /**
+     * Returns the upper bound.
+     *
+     * @return the upper bound
+     */
+    public int upperBound() {
+        return upperBound;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(lowerBound, upperBound);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (!(obj instanceof ClosedOpenRange)) {
+            return false;
+        }
+
+        final ClosedOpenRange other = (ClosedOpenRange) obj;
+        return Objects.equals(this.lowerBound, other.lowerBound)
+                && Objects.equals(this.upperBound, other.upperBound);
+    }
+
+    @Override
+    public String toString() {
+        return "[" + lowerBound + ".." + upperBound + ")";
+    }
+}