【ONOS-1223】【ONOS-1870】the implements of label resource APIs.it include
commands
used to test
if there is any bug,LabelResourceManager,LabelResourceStore using
copycat,and junit test code.
the distribution strategy is that the master of devices handle all the
requests if applied label belongs to it.except for query request.
label store uses copycat instead of hazelcast to keep strong consistency

Change-Id: I77bde6a96f33098063573d37ed1ba787ae21973f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/DistributedLabelResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/DistributedLabelResourceStore.java
new file mode 100644
index 0000000..ce122c0
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/DistributedLabelResourceStore.java
@@ -0,0 +1,578 @@
+package org.onosproject.store.resource.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.resource.DefaultLabelResource;
+import org.onosproject.net.resource.LabelResource;
+import org.onosproject.net.resource.LabelResourceDelegate;
+import org.onosproject.net.resource.LabelResourceEvent;
+import org.onosproject.net.resource.LabelResourceEvent.Type;
+import org.onosproject.net.resource.LabelResourceId;
+import org.onosproject.net.resource.LabelResourcePool;
+import org.onosproject.net.resource.LabelResourceRequest;
+import org.onosproject.net.resource.LabelResourceStore;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
+import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+/**
+ * Manages label resources using copycat.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedLabelResourceStore
+        extends AbstractStore<LabelResourceEvent, LabelResourceDelegate>
+        implements LabelResourceStore {
+    private final Logger log = getLogger(getClass());
+
+    private static final String POOL_MAP_NAME = "labelresourcepool";
+
+    private static final String GLOBAL_RESOURCE_POOL_DEVICE_ID = "global_resource_pool_device_id";
+    // primary data:
+    // read/write needs to be locked
+    private final ReentrantReadWriteLock resourcePoolLock = new ReentrantReadWriteLock();
+
+    private ConsistentMap<DeviceId, LabelResourcePool> resourcePool = null;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ReplicaInfoService replicaInfoManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    private ExecutorService messageHandlingExecutor;
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+    private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(DistributedStoreSerializers.STORE_COMMON)
+                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+                    .register(LabelResourceEvent.class)
+                    .register(LabelResourcePool.class).register(DeviceId.class)
+                    .register(LabelResourceRequest.class)
+                    .register(LabelResourceRequest.Type.class)
+                    .register(LabelResourceEvent.Type.class)
+                    .register(DefaultLabelResource.class)
+                    .register(LabelResourceId.class).build();
+        }
+    };
+
+    @Activate
+    public void activate() {
+
+        resourcePool = storageService
+                .<DeviceId, LabelResourcePool>consistentMapBuilder()
+                .withName(POOL_MAP_NAME).withSerializer(new Serializer() {
+                    KryoNamespace kryo = new KryoNamespace.Builder()
+                            .register(KryoNamespaces.API).build();
+
+                    @Override
+                    public <T> byte[] encode(T object) {
+                        return kryo.serialize(object);
+                    }
+
+                    @Override
+                    public <T> T decode(byte[] bytes) {
+                        return kryo.deserialize(bytes);
+                    }
+                }).withPartitionsDisabled().build();
+        messageHandlingExecutor = Executors
+                .newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
+                                    groupedThreads("onos/store/flow",
+                                                   "message-handlers"));
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       LabelResourcePool operation = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 operation);
+                                       boolean b = internalCreate(operation);
+                                           message.respond(SERIALIZER.encode(b));
+                                   }
+                               }, messageHandlingExecutor);
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       DeviceId deviceId = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 deviceId);
+                                       boolean b = internalDestroy(deviceId);
+                                           message.respond(SERIALIZER.encode(b));
+                                   }
+                               }, messageHandlingExecutor);
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       LabelResourceRequest request = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 request);
+                                       final Collection<LabelResource> resource = internalApply(request);
+                                           message.respond(SERIALIZER
+                                                   .encode(resource));
+                                   }
+                               }, messageHandlingExecutor);
+        clusterCommunicator
+                .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+                               new ClusterMessageHandler() {
+
+                                   @Override
+                                   public void handle(ClusterMessage message) {
+                                       LabelResourceRequest request = SERIALIZER
+                                               .decode(message.payload());
+                                       log.trace("received get flow entry request for {}",
+                                                 request);
+                                       final boolean isSuccess = internalRelease(request);
+                                           message.respond(SERIALIZER
+                                                   .encode(isSuccess));
+                                   }
+                               }, messageHandlingExecutor);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED);
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY);
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED);
+        clusterCommunicator
+                .removeSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE);
+        messageHandlingExecutor.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public boolean createDevicePool(DeviceId deviceId,
+                                    LabelResourceId beginLabel,
+                                    LabelResourceId endLabel) {
+        LabelResourcePool pool = new LabelResourcePool(deviceId.toString(),
+                                                       beginLabel.labelId(),
+                                                       endLabel.labelId());
+        return this.create(pool);
+    }
+
+    @Override
+    public boolean createGlobalPool(LabelResourceId beginLabel,
+                                    LabelResourceId endLabel) {
+        LabelResourcePool pool = new LabelResourcePool(
+                                                       GLOBAL_RESOURCE_POOL_DEVICE_ID,
+                                                       beginLabel.labelId(),
+                                                       endLabel.labelId());
+        return this.internalCreate(pool);
+    }
+
+    private boolean create(LabelResourcePool pool) {
+        Device device = (Device) deviceService.getDevice(pool.deviceId());
+        if (device == null) {
+            return false;
+        }
+
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(pool
+                .deviceId());
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("Failed to getFlowEntries: No master for {}", pool);
+            return false;
+        }
+
+        if (replicaInfo.master().get()
+                .equals(clusterService.getLocalNode().id())) {
+            return internalCreate(pool);
+        }
+
+        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), pool.deviceId());
+
+        return complete(clusterCommunicator
+                .sendAndReceive(pool,
+                                LabelResourceMessageSubjects.LABEL_POOL_CREATED,
+                                SERIALIZER::encode, SERIALIZER::decode,
+                                replicaInfo.master().get()));
+    }
+
+    private boolean internalCreate(LabelResourcePool pool) {
+        resourcePoolLock.writeLock().lock();
+        LabelResourcePool poolOld = resourcePool.get(pool.deviceId()).value();
+        if (poolOld == null) {
+            resourcePool.put(pool.deviceId(), pool);
+            resourcePoolLock.writeLock().unlock();
+            LabelResourceEvent event = new LabelResourceEvent(
+                                                              Type.POOL_CREATED,
+                                                              pool);
+            notifyDelegate(event);
+            return true;
+        }
+        resourcePoolLock.writeLock().unlock();
+        return false;
+    }
+
+    @Override
+    public boolean destroyDevicePool(DeviceId deviceId) {
+        Device device = (Device) deviceService.getDevice(deviceId);
+        if (device == null) {
+            return false;
+        }
+        ReplicaInfo replicaInfo = replicaInfoManager
+                .getReplicaInfoFor(deviceId);
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+            return false;
+        }
+
+        if (replicaInfo.master().get()
+                .equals(clusterService.getLocalNode().id())) {
+            return internalDestroy(deviceId);
+        }
+
+        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), deviceId);
+
+        return complete(clusterCommunicator
+                .sendAndReceive(deviceId,
+                                LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
+                                SERIALIZER::encode, SERIALIZER::decode,
+                                replicaInfo.master().get()));
+    }
+
+    private boolean internalDestroy(DeviceId deviceId) {
+        LabelResourcePool poolOld = resourcePool.get(deviceId).value();
+        if (poolOld != null) {
+            resourcePool.remove(deviceId);
+            LabelResourceEvent event = new LabelResourceEvent(
+                                                              Type.POOL_CREATED,
+                                                              poolOld);
+            notifyDelegate(event);
+        }
+        log.info("success to destroy the label resource pool of device id {}",
+                 deviceId);
+        return true;
+    }
+
+    @Override
+    public Collection<LabelResource> applyFromDevicePool(DeviceId deviceId,
+                                                         long applyNum) {
+        Device device = (Device) deviceService.getDevice(deviceId);
+        if (device == null) {
+            return Collections.emptyList();
+        }
+        LabelResourceRequest request = new LabelResourceRequest(
+                                                                deviceId,
+                                                                LabelResourceRequest.Type.APPLY,
+                                                                applyNum, null);
+        ReplicaInfo replicaInfo = replicaInfoManager
+                .getReplicaInfoFor(deviceId);
+
+        if (!replicaInfo.master().isPresent()) {
+            log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+            return Collections.emptyList();
+        }
+
+        if (replicaInfo.master().get()
+                .equals(clusterService.getLocalNode().id())) {
+            return internalApply(request);
+        }
+
+        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                  replicaInfo.master().orNull(), deviceId);
+
+        return complete(clusterCommunicator
+                .sendAndReceive(request,
+                                LabelResourceMessageSubjects.LABEL_POOL_APPLY,
+                                SERIALIZER::encode, SERIALIZER::decode,
+                                replicaInfo.master().get()));
+    }
+
+    private Collection<LabelResource> internalApply(LabelResourceRequest request) {
+        resourcePoolLock.writeLock().lock();
+        DeviceId deviceId = request.deviceId();
+        long applyNum = request.applyNum();
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        Collection<LabelResource> result = new HashSet<LabelResource>();
+        long freeNum = this.getFreeNumOfDevicePool(deviceId);
+        if (applyNum > freeNum) {
+            log.info("the free number of the label resource pool of deviceId {} is not enough.");
+            resourcePoolLock.writeLock().unlock();
+            return Collections.emptyList();
+        }
+        Set<LabelResource> releaseLabels = new HashSet<LabelResource>(
+                                                                      pool.releaseLabelId());
+        long tmp = releaseLabels.size() > applyNum ? applyNum : releaseLabels
+                .size();
+        LabelResource resource = null;
+        for (int i = 0; i < tmp; i++) {
+            Iterator<LabelResource> it = releaseLabels.iterator();
+            if (it.hasNext()) {
+                resource = it.next();
+                releaseLabels.remove(resource);
+            }
+            result.add(resource);
+        }
+        for (long j = pool.currentUsedMaxLabelId().labelId(); j < pool
+                .currentUsedMaxLabelId().labelId() + applyNum - tmp; j++) {
+            resource = new DefaultLabelResource(deviceId,
+                                                LabelResourceId
+                                                        .labelResourceId(j));
+            result.add(resource);
+        }
+        long beginLabel = pool.beginLabel().labelId();
+        long endLabel = pool.endLabel().labelId();
+        long totalNum = pool.totalNum();
+        long current = pool.currentUsedMaxLabelId().labelId() + applyNum - tmp;
+        long usedNum = pool.usedNum() + applyNum;
+        ImmutableSet<LabelResource> freeLabel = ImmutableSet
+                .copyOf(releaseLabels);
+        LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+                                                          beginLabel, endLabel,
+                                                          totalNum, usedNum,
+                                                          current, freeLabel);
+        resourcePool.put(deviceId, newPool);
+        log.info("success to apply label resource");
+        resourcePoolLock.writeLock().unlock();
+        return result;
+    }
+
+    @Override
+    public boolean releaseToDevicePool(Multimap<DeviceId, LabelResource> release) {
+        Map<DeviceId, Collection<LabelResource>> maps = release.asMap();
+        Set<DeviceId> deviceIdSet = maps.keySet();
+        LabelResourceRequest request = null;
+        for (Iterator<DeviceId> it = deviceIdSet.iterator(); it.hasNext();) {
+            DeviceId deviceId = (DeviceId) it.next();
+            Device device = (Device) deviceService.getDevice(deviceId);
+            if (device == null) {
+                continue;
+            }
+            ImmutableSet<LabelResource> collection = ImmutableSet.copyOf(maps
+                    .get(deviceId));
+            request = new LabelResourceRequest(
+                                               deviceId,
+                                               LabelResourceRequest.Type.RELEASE,
+                                               0, collection);
+            ReplicaInfo replicaInfo = replicaInfoManager
+                    .getReplicaInfoFor(deviceId);
+
+            if (!replicaInfo.master().isPresent()) {
+                log.warn("Failed to getFlowEntries: No master for {}", deviceId);
+                return false;
+            }
+
+            if (replicaInfo.master().get()
+                    .equals(clusterService.getLocalNode().id())) {
+                return internalRelease(request);
+            }
+
+            log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
+                      replicaInfo.master().orNull(), deviceId);
+
+            return complete(clusterCommunicator
+                    .sendAndReceive(request,
+                                    LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
+                                    SERIALIZER::encode, SERIALIZER::decode,
+                                    replicaInfo.master().get()));
+        }
+        return false;
+    }
+
+    private boolean internalRelease(LabelResourceRequest request) {
+        resourcePoolLock.writeLock().lock();
+        DeviceId deviceId = request.deviceId();
+        Collection<LabelResource> release = request.releaseCollection();
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        if (pool == null) {
+            resourcePoolLock.writeLock().unlock();
+            log.info("the label resource pool of device id {} does not exist");
+            return false;
+        }
+        Set<LabelResource> storeSet = new HashSet<LabelResource>(
+                                                                 pool.releaseLabelId());
+        LabelResource labelResource = null;
+        long realReleasedNum = 0;
+        for (Iterator<LabelResource> it = release.iterator(); it.hasNext();) {
+            labelResource = it.next();
+            if (labelResource.labelResourceId().labelId() < pool.beginLabel()
+                    .labelId()
+                    || labelResource.labelResourceId().labelId() > pool
+                            .endLabel().labelId()) {
+                continue;
+            }
+            if (pool.currentUsedMaxLabelId().labelId() > labelResource
+                    .labelResourceId().labelId()
+                    || !storeSet.contains(labelResource)) {
+                storeSet.add(labelResource);
+                realReleasedNum++;
+            }
+        }
+        long beginNum = pool.beginLabel().labelId();
+        long endNum = pool.endLabel().labelId();
+        long totalNum = pool.totalNum();
+        long usedNum = pool.usedNum() - realReleasedNum;
+        long current = pool.currentUsedMaxLabelId().labelId();
+        ImmutableSet<LabelResource> s = ImmutableSet.copyOf(storeSet);
+        LabelResourcePool newPool = new LabelResourcePool(deviceId.toString(),
+                                                          beginNum, endNum,
+                                                          totalNum, usedNum,
+                                                          current, s);
+        resourcePool.put(deviceId, newPool);
+        log.info("success to release label resource");
+        resourcePoolLock.writeLock().unlock();
+        return true;
+    }
+
+    @Override
+    public boolean isDevicePoolFull(DeviceId deviceId) {
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        if (pool == null) {
+            return true;
+        }
+        return pool.currentUsedMaxLabelId() == pool.endLabel()
+                && pool.releaseLabelId().size() == 0 ? true : false;
+    }
+
+    @Override
+    public long getFreeNumOfDevicePool(DeviceId deviceId) {
+        LabelResourcePool pool = resourcePool.get(deviceId).value();
+        if (pool == null) {
+            return 0;
+        }
+        return pool.endLabel().labelId()
+                - pool.currentUsedMaxLabelId().labelId()
+                + pool.releaseLabelId().size();
+    }
+
+    @Override
+    public LabelResourcePool getDeviceLabelResourcePool(DeviceId deviceId) {
+        return resourcePool.get(deviceId).value();
+    }
+
+    @Override
+    public boolean destroyGlobalPool() {
+        return this.internalDestroy(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    @Override
+    public Collection<LabelResource> applyFromGlobalPool(long applyNum) {
+        LabelResourceRequest request = new LabelResourceRequest(
+                                                                DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+                                                                LabelResourceRequest.Type.APPLY,
+                                                                applyNum, null);
+        return this.internalApply(request);
+    }
+
+    @Override
+    public boolean releaseToGlobalPool(Set<LabelResourceId> release) {
+        Set<LabelResource> set = new HashSet<LabelResource>();
+        DefaultLabelResource resource = null;
+        for (LabelResourceId labelResource : release) {
+            resource = new DefaultLabelResource(
+                                                DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+                                                labelResource);
+            set.add(resource);
+        }
+        LabelResourceRequest request = new LabelResourceRequest(
+                                                                DeviceId.deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID),
+                                                                LabelResourceRequest.Type.APPLY,
+                                                                0,
+                                                                ImmutableSet
+                                                                        .copyOf(set));
+        return this.internalRelease(request);
+    }
+
+    @Override
+    public boolean isGlobalPoolFull() {
+        return this.isDevicePoolFull(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    @Override
+    public long getFreeNumOfGlobalPool() {
+        return this.getFreeNumOfDevicePool(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    @Override
+    public LabelResourcePool getGlobalLabelResourcePool() {
+        return this.getDeviceLabelResourcePool(DeviceId
+                .deviceId(GLOBAL_RESOURCE_POOL_DEVICE_ID));
+    }
+
+    private <T> T complete(Future<T> future) {
+        try {
+            return future.get(PEER_REQUEST_TIMEOUT_MS,
+                                                TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted while waiting for operation to complete.", e);
+            return null;
+        } catch (TimeoutException | ExecutionException e) {
+            log.error("Failed remote operation", e);
+            return null;
+        }
+    }
+}