Moving LabelResourceManager to incubator

Breaking apart resource package into {device, link, label}
Refactored cluster serializers so they are visible

Change-Id: I71051bcd5e790ae6abeb154bf58286e584c32858
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
new file mode 100644
index 0000000..09a558d
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
@@ -0,0 +1,578 @@
+package org.onosproject.incubator.store.resource.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.incubator.net.resource.label.DefaultLabelResource;
+import org.onosproject.incubator.net.resource.label.LabelResource;
+import org.onosproject.incubator.net.resource.label.LabelResourceDelegate;
+import org.onosproject.incubator.net.resource.label.LabelResourceEvent;
+import org.onosproject.incubator.net.resource.label.LabelResourceEvent.Type;
+import org.onosproject.incubator.net.resource.label.LabelResourceId;
+import org.onosproject.incubator.net.resource.label.LabelResourcePool;
+import org.onosproject.incubator.net.resource.label.LabelResourceRequest;
+import org.onosproject.incubator.net.resource.label.LabelResourceStore;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
+import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+/**
+ * Manages label resources using copycat.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedLabelResourceStore
+        extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
+        implements LabelResourceStore {
+    private final Logger log = getLogger(getClass());
+
+    private static final String POOL_MAP_NAME = "labelresourcepool";
+
+    private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
+    // primary data:
+    // read/write needs to be locked
+    private final ReentrantReadWriteLock resourcePoolLock = new ReentrantReadWriteLock();
+
+    private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ReplicaInfoService replicaInfoManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    private ExecutorService messageHandlingExecutor;
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+    private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(DistributedStoreSerializers.STORE_COMMON)
+                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+                    .register(LabelResourceEvent.class)
+                    .register(LabelResourcePool.class).register(DeviceId.class)
+                    .register(LabelResourceRequest.class)
+                    .register(LabelResourceRequest.Type.class)
+                    .register(LabelResourceEvent.Type.class)
+                    .register(DefaultLabelResource.class)
+                    .register(LabelResourceId.class).build();
+        }
+    };
+
+    @Activate
+    public void activate() {
+
+        resourcePool = storageService
+                .<DeviceId, LabelResourcePool>consistentMapBuilder()
+                .withName(POOL_MAP_NAME).withSerializer(new Serializer() {
+                    KryoNamespace kryo = new KryoNamespace.Builder()
+                            .register(KryoNamespaces.API).build();
+
+                    @Override
+                    public <T> byte[] encode(T object) {
+                        return kryo.serialize(object);
+                    }
+
+                    @Override
+                    public <T> T decode(byte[] bytes) {
+                        return kryo.deserialize(bytes);
+                    }
+                }).withPartitionsDisabled().build();
+        messageHandlingExecutor = Executors
+                .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                                    groupedThreads("onos/store/flow",
+                                                   "message-handlers"));
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       LabelResourcePool operation = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 operation);
+                                       boolean b = internalCreate(operation);
+                                           message.respond(SERIALIZER.encode(b));
+                                   }
+                               }, messageHandlingExecutor);
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       DeviceId deviceId = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 deviceId);
+                                       boolean b = internalDestroy(deviceId);
+                                           message.respond(SERIALIZER.encode(b));
+                                   }
+                               }, messageHandlingExecutor);
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       LabelResourceRequest request = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 request);
+                                       final Collection<LabelResource> resource = internalApply(request);
+                                           message.respond(SERIALIZER
+                                                   .encode(resource));
+                                   }
+                               }, messageHandlingExecutor);
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       LabelResourceRequest request = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 request);
+                                       final boolean isSuccess = internalRelease(request);
+                                           message.respond(SERIALIZER
+                                                   .encode(isSuccess));
+                                   }
+                               }, messageHandlingExecutor);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
+        messageHandlingExecutor.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public boolean createDevicePool(DeviceId deviceId,
+                                    LabelResourceId beginLabel,
+                                    LabelResourceId endLabel) {
+        LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
+                                                       beginLabel.labelId(),
+                                                       endLabel.labelId());
+        return this.create(pool);
+    }
+
+    @Override
+    public boolean createGlobalPool(LabelResourceId beginLabel,
+                                    LabelResourceId endLabel) {
+        LabelResourcePool pool = new LabelResourcePool(
+                                                       GLOBAL_RESOURCE_POOL_DEVICE_ID,
+                                                       beginLabel.labelId(),
+                                                       endLabel.labelId());
+        return this.internalCreate(pool);
+    }
+
+    private boolean create(LabelResourcePool pool) {
+        Device device = (Device) deviceService.getDevice(pool.deviceId());
+        if (device == null) {
+            return false;
+        }
+
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(pool
+                .deviceId());
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("Failed to getFlowEntries: No master for {}", pool);
+            return false;
+        }
+
+        if (replicaInfo.master().get()
+                .equals(clusterService.getLocalNode().id())) {
+            return internalCreate(pool);
+        }
+
+        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), pool.deviceId());
+
+        return complete(clusterCommunicator
+                .sendAndReceive(pool,
+                                LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+                                SERIALIZER::encode, SERIALIZER::decode,
+                                replicaInfo.master().get()));
+    }
+
+    private boolean internalCreate(LabelResourcePool pool) {
+        resourcePoolLock.writeLock().lock();
+        LabelResourcePool poolOld = resourcePool.get(pool.deviceId()).value();
+        if (poolOld == null) {
+            resourcePool.put(pool.deviceId(), pool);
+            resourcePoolLock.writeLock().unlock();
+            LabelResourceEvent event = new LabelResourceEvent(
+                                                              Type.POOL_CREATED,
+                                                              pool);
+            notifyDelegate(event);
+            return true;
+        }
+        resourcePoolLock.writeLock().unlock();
+        return false;
+    }
+
+    @Override
+    public boolean destroyDevicePool(DeviceId deviceId) {
+        Device device = (Device) deviceService.getDevice(deviceId);
+        if (device == null) {
+            return false;
+        }
+        ReplicaInfo replicaInfo = replicaInfoManager
+                .getReplicaInfoFor(deviceId);
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+            return false;
+        }
+
+        if (replicaInfo.master().get()
+                .equals(clusterService.getLocalNode().id())) {
+            return internalDestroy(deviceId);
+        }
+
+        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), deviceId);
+
+        return complete(clusterCommunicator
+                .sendAndReceive(deviceId,
+                                LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+                                SERIALIZER::encode, SERIALIZER::decode,
+                                replicaInfo.master().get()));
+    }
+
+    private boolean internalDestroy(DeviceId deviceId) {
+        LabelResourcePool poolOld = resourcePool.get(deviceId).value();
+        if (poolOld != null) {
+            resourcePool.remove(deviceId);
+            LabelResourceEvent event = new LabelResourceEvent(
+                                                              Type.POOL_CREATED,
+                                                              poolOld);
+            notifyDelegate(event);
+        }
+        log.info("success to destroy the label resource pool of device id {}",
+                 deviceId);
+        return true;
+    }
+
+    @Override
+    public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
+                                                         long applyNum) {
+        Device device = (Device) deviceService.getDevice(deviceId);
+        if (device == null) {
+            return Collections.emptyList();
+        }
+        LabelResourceRequest request = new LabelResourceRequest(
+                                                                deviceId,
+                                                                LabelResourceRequest.Type.APPLY,
+                                                                applyNum, null);
+        ReplicaInfo replicaInfo = replicaInfoManager
+                .getReplicaInfoFor(deviceId);
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+            return Collections.emptyList();
+        }
+
+        if (replicaInfo.master().get()
+                .equals(clusterService.getLocalNode().id())) {
+            return internalApply(request);
+        }
+
+        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), deviceId);
+
+        return complete(clusterCommunicator
+                .sendAndReceive(request,
+                                LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+                                SERIALIZER::encode, SERIALIZER::decode,
+                                replicaInfo.master().get()));
+    }
+
+    private Collection<LabelResource> internalApply(LabelResourceRequest request) {
+        resourcePoolLock.writeLock().lock();
+        DeviceId deviceId = request.deviceId();
+        long applyNum = request.applyNum();
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        Collection<LabelResource> result = new HashSet<LabelResource>();
+        long freeNum = this.getFreeNumOfDevicePool(deviceId);
+        if (applyNum > freeNum) {
+            log.info("the free number of the label resource pool of deviceId {} is not enough.");
+            resourcePoolLock.writeLock().unlock();
+            return Collections.emptyList();
+        }
+        Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
+                                                                      pool.releaseLabelId());
+        long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
+                .size();
+        LabelResource resource = null;
+        for (int i = 0; i < tmp; i++) {
+            Iterator<LabelResource> it = releaseLabels.iterator();
+            if (it.hasNext()) {
+                resource = it.next();
+                releaseLabels.remove(resource);
+            }
+            result.add(resource);
+        }
+        for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
+                .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
+            resource = new DefaultLabelResource(deviceId,
+                                                LabelResourceId
+                                                        .labelResourceId(j));
+            result.add(resource);
+        }
+        long beginLabel = pool.beginLabel().labelId();
+        long endLabel = pool.endLabel().labelId();
+        long totalNum = pool.totalNum();
+        long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
+        long usedNum = pool.usedNum() + applyNum;
+        ImmutableSet<LabelResource> freeLabel = ImmutableSet
+                .copyOf(releaseLabels);
+        LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+                                                          beginLabel, endLabel,
+                                                          totalNum, usedNum,
+                                                          current, freeLabel);
+        resourcePool.put(deviceId, newPool);
+        log.info("success to apply label resource");
+        resourcePoolLock.writeLock().unlock();
+        return result;
+    }
+
+    @Override
+    public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
+        Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
+        Set<DeviceId> deviceIdSet = maps.keySet();
+        LabelResourceRequest request = null;
+        for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
+            DeviceId deviceId = (DeviceId) it.next();
+            Device device = (Device) deviceService.getDevice(deviceId);
+            if (device == null) {
+                continue;
+            }
+            ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
+                    .get(deviceId));
+            request = new LabelResourceRequest(
+                                               deviceId,
+                                               LabelResourceRequest.Type.RELEASE,
+                                               0, collection);
+            ReplicaInfo replicaInfo = replicaInfoManager
+                    .getReplicaInfoFor(deviceId);
+
+            if (!replicaInfo.master().isPresent()) {
+                log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+                return false;
+            }
+
+            if (replicaInfo.master().get()
+                    .equals(clusterService.getLocalNode().id())) {
+                return internalRelease(request);
+            }
+
+            log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                      replicaInfo.master().orNull(), deviceId);
+
+            return complete(clusterCommunicator
+                    .sendAndReceive(request,
+                                    LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+                                    SERIALIZER::encode, SERIALIZER::decode,
+                                    replicaInfo.master().get()));
+        }
+        return false;
+    }
+
+    private boolean internalRelease(LabelResourceRequest request) {
+        resourcePoolLock.writeLock().lock();
+        DeviceId deviceId = request.deviceId();
+        Collection<LabelResource> release = request.releaseCollection();
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        if (pool == null) {
+            resourcePoolLock.writeLock().unlock();
+            log.info("the label resource pool of device id {} does not exist");
+            return false;
+        }
+        Set<LabelResource> storeSet = new HashSet<LabelResource>(
+                                                                 pool.releaseLabelId());
+        LabelResource labelResource = null;
+        long realReleasedNum = 0;
+        for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
+            labelResource = it.next();
+            if (labelResource.labelResourceId().labelId() < pool.beginLabel()
+                    .labelId()
+                    || labelResource.labelResourceId().labelId() > pool
+                            .endLabel().labelId()) {
+                continue;
+            }
+            if (pool.currentUsedMaxLabelId().labelId() > labelResource
+                    .labelResourceId().labelId()
+                    || !storeSet.contains(labelResource)) {
+                storeSet.add(labelResource);
+                realReleasedNum++;
+            }
+        }
+        long beginNum = pool.beginLabel().labelId();
+        long endNum = pool.endLabel().labelId();
+        long totalNum = pool.totalNum();
+        long usedNum = pool.usedNum() - realReleasedNum;
+        long current = pool.currentUsedMaxLabelId().labelId();
+        ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
+        LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+                                                          beginNum, endNum,
+                                                          totalNum, usedNum,
+                                                          current, s);
+        resourcePool.put(deviceId, newPool);
+        log.info("success to release label resource");
+        resourcePoolLock.writeLock().unlock();
+        return true;
+    }
+
+    @Override
+    public boolean isDevicePoolFull(DeviceId deviceId) {
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        if (pool == null) {
+            return true;
+        }
+        return pool.currentUsedMaxLabelId() == pool.endLabel()
+                && pool.releaseLabelId().size() == 0 ? true : false;
+    }
+
+    @Override
+    public long getFreeNumOfDevicePool(DeviceId deviceId) {
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        if (pool == null) {
+            return 0;
+        }
+        return pool.endLabel().labelId()
+                - pool.currentUsedMaxLabelId().labelId()
+                + pool.releaseLabelId().size();
+    }
+
+    @Override
+    public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
+        return resourcePool.get(deviceId).value();
+    }
+
+    @Override
+    public boolean destroyGlobalPool() {
+        return this.internalDestroy(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    @Override
+    public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
+        LabelResourceRequest request = new LabelResourceRequest(
+                                                                DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+                                                                LabelResourceRequest.Type.APPLY,
+                                                                applyNum, null);
+        return this.internalApply(request);
+    }
+
+    @Override
+    public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
+        Set<LabelResource> set = new HashSet<LabelResource>();
+        DefaultLabelResource resource = null;
+        for (LabelResourceId labelResource : release) {
+            resource = new DefaultLabelResource(
+                                                DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+                                                labelResource);
+            set.add(resource);
+        }
+        LabelResourceRequest request = new LabelResourceRequest(
+                                                                DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+                                                                LabelResourceRequest.Type.APPLY,
+                                                                0,
+                                                                ImmutableSet
+                                                                        .copyOf(set));
+        return this.internalRelease(request);
+    }
+
+    @Override
+    public boolean isGlobalPoolFull() {
+        return this.isDevicePoolFull(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    @Override
+    public long getFreeNumOfGlobalPool() {
+        return this.getFreeNumOfDevicePool(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    @Override
+    public LabelResourcePool getGlobalLabelResourcePool() {
+        return this.getDeviceLabelResourcePool(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    private <T> T complete(Future<T> future) {
+        try {
+            return future.get(PEER_REQUEST_TIMEOUT_MS,
+                                                TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted while waiting for operation to complete.", e);
+            return null;
+        } catch (TimeoutException | ExecutionException e) {
+            log.error("Failed remote operation", e);
+            return null;
+        }
+    }
+}