DistributedLinkResourceStore
Change-Id: Ia45c221946693906c12d64f20f25e30786a04224
diff --git a/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java b/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java
index fcb3cd8..8b84059 100644
--- a/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java
+++ b/core/api/src/main/java/org/onlab/onos/net/resource/Bandwidth.java
@@ -17,6 +17,7 @@
import java.util.Objects;
+// FIXME: Document what is the unit? Mbps?
/**
* Representation of bandwidth resource.
*/
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
index 59f36ed..e6ff26e 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
@@ -49,6 +49,10 @@
.toString();
}
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
/**
* Builder for BatchWriteRequest.
*/
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
index 3234bea..bc2f527 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
@@ -64,13 +64,13 @@
*
* @param tableName name of the table
* @param key key in the table
- * @param newValue value to write, must not be null
* @param oldValue previous value expected, must not be null
+ * @param newValue value to write, must not be null
* @return WriteRequest
*/
public static WriteRequest putIfValueMatches(String tableName, String key,
- byte[] newValue,
- byte[] oldValue) {
+ byte[] oldValue,
+ byte[] newValue) {
return new WriteRequest(PUT_IF_VALUE, tableName, key,
checkNotNull(newValue), ANY_VERSION,
checkNotNull(oldValue));
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java
index d05620d..b1cc6a8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java
@@ -18,9 +18,13 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.resource.Bandwidth;
import org.onlab.onos.net.resource.BandwidthResourceAllocation;
import org.onlab.onos.net.resource.Lambda;
@@ -29,34 +33,88 @@
import org.onlab.onos.net.resource.LinkResourceStore;
import org.onlab.onos.net.resource.ResourceAllocation;
import org.onlab.onos.net.resource.ResourceType;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteRequest.Builder;
+import org.onlab.onos.store.service.BatchWriteResult;
+import org.onlab.onos.store.service.DatabaseAdminService;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
-import java.util.Collections;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Predicates.notNull;
+import static org.onlab.util.HexString.toHexString;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Manages link resources using trivial in-memory structures implementation.
+ * Manages link resources using database service.
*/
@Component(immediate = true)
@Service
public class DistributedLinkResourceStore implements LinkResourceStore {
+
private final Logger log = getLogger(getClass());
- private Map<IntentId, LinkResourceAllocations> linkResourceAllocationsMap;
- private Map<Link, Set<LinkResourceAllocations>> allocatedResources;
- private Map<Link, Set<ResourceAllocation>> freeResources;
+
+ // FIXME: what is the Bandwidth unit?
+ private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.valueOf(1_000);
+
+ // table to store current allocations
+ /** LinkKey -> List<LinkResourceAllocations>. */
+ private static final String LINK_RESOURCE_ALLOCATIONS = "LinkResourceAllocations";
+
+ /** IntentId -> LinkResourceAllocations. */
+ private static final String INTENT_ALLOCATIONS = "IntentAllocations";
+
+ private static final Bandwidth EMPTY_BW = Bandwidth.valueOf(0);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DatabaseAdminService databaseAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DatabaseService databaseService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ // Link annotation key name to use as bandwidth
+ private String bandwidthAnnotation = "bandwidth";
+ // Link annotation key name to use as max lamda
+ private String wavesAnnotation = "optical.waves";
+
+ private StoreSerializer serializer;
+
@Activate
public void activate() {
- linkResourceAllocationsMap = new HashMap<>();
- allocatedResources = new HashMap<>();
- freeResources = new HashMap<>();
+
+ serializer = new KryoSerializer();
+
+ Set<String> tables = databaseAdminService.listTables();
+ if (!tables.contains(LINK_RESOURCE_ALLOCATIONS)) {
+ databaseAdminService.createTable(LINK_RESOURCE_ALLOCATIONS);
+ }
+ if (!tables.contains(INTENT_ALLOCATIONS)) {
+ databaseAdminService.createTable(INTENT_ALLOCATIONS);
+ }
log.info("Started");
}
@@ -66,167 +124,431 @@
log.info("Stopped");
}
- /**
- * Returns free resources for a given link obtaining from topology
- * information.
- *
- * @param link the target link
- * @return free resources
- */
- private synchronized Set<ResourceAllocation> readOriginalFreeResources(Link link) {
- // TODO read capacity and lambda resources from topology
- Set<ResourceAllocation> allocations = new HashSet<>();
- for (int i = 1; i <= 100; i++) {
- allocations.add(new LambdaResourceAllocation(Lambda.valueOf(i)));
+ private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) {
+ // TODO: plugin/provider mechanism to add resource type in the future?
+ if (type == ResourceType.BANDWIDTH) {
+ return ImmutableSet.of(getBandwidthResourceCapacity(link));
}
- allocations.add(new BandwidthResourceAllocation(Bandwidth.valueOf(1000000)));
+ if (type == ResourceType.LAMBDA) {
+ return getLambdaResourceCapacity(link);
+ }
+ return null;
+ }
+
+ private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) {
+ // FIXME enumerate all the possible link/port lambdas
+ Set<LambdaResourceAllocation> allocations = new HashSet<>();
+ try {
+ final int waves = Integer.parseInt(link.annotations().value(wavesAnnotation));
+ for (int i = 1; i <= waves; i++) {
+ allocations.add(new LambdaResourceAllocation(Lambda.valueOf(i)));
+ }
+ } catch (NumberFormatException e) {
+ log.debug("No {} annotation on link %s", wavesAnnotation, link);
+ }
return allocations;
}
- /**
- * Finds and returns {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object from a given
- * set.
- *
- * @param freeRes a set of ResourceAllocation object.
- * @return {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object if found, otherwise
- * {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object with 0 bandwidth
- *
- */
- private synchronized BandwidthResourceAllocation getBandwidth(Set<ResourceAllocation> freeRes) {
- for (ResourceAllocation res : freeRes) {
- if (res.type() == ResourceType.BANDWIDTH) {
- return (BandwidthResourceAllocation) res;
+ private BandwidthResourceAllocation getBandwidthResourceCapacity(Link link) {
+
+ // if Link annotation exist, use them
+ // if all fails, use DEFAULT_BANDWIDTH
+
+ Bandwidth bandwidth = null;
+ String strBw = link.annotations().value(bandwidthAnnotation);
+ if (strBw != null) {
+ try {
+ bandwidth = Bandwidth.valueOf(Double.parseDouble(strBw));
+ } catch (NumberFormatException e) {
+ // do nothings
+ bandwidth = null;
}
}
- return new BandwidthResourceAllocation(Bandwidth.valueOf(0));
+
+ if (bandwidth == null) {
+ // fall back, use fixed default
+ bandwidth = DEFAULT_BANDWIDTH;
+ }
+ return new BandwidthResourceAllocation(bandwidth);
}
- /**
- * Subtracts given resources from free resources for given link.
- *
- * @param link the target link
- * @param allocations the resources to be subtracted
- */
- private synchronized void subtractFreeResources(Link link, LinkResourceAllocations allocations) {
- // TODO Use lock or version for updating freeResources.
+ private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) {
+ Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>();
+ for (ResourceType type : ResourceType.values()) {
+ Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link);
+ if (cap != null) {
+ caps.put(type, cap);
+ }
+ }
+ return caps;
+ }
+
+ @Override
+ public Set<ResourceAllocation> getFreeResources(Link link) {
+ Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(link);
+ Set<ResourceAllocation> allFree = new HashSet<>();
+ for (Set<? extends ResourceAllocation> r:freeResources.values()) {
+ allFree.addAll(r);
+ }
+ return allFree;
+ }
+
+ private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(Link link) {
+ // returns capacity - allocated
+
checkNotNull(link);
- Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
- Set<ResourceAllocation> subRes = allocations.getResourceAllocation(link);
- for (ResourceAllocation res : subRes) {
- switch (res.type()) {
+ Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>();
+ final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link);
+ final Iterable<LinkResourceAllocations> allocations = getAllocations(link);
+
+ for (ResourceType type : ResourceType.values()) {
+ // there should be class/category of resources
+ switch (type) {
case BANDWIDTH:
- BandwidthResourceAllocation ba = getBandwidth(freeRes);
- double requestedBandwidth =
- ((BandwidthResourceAllocation) res).bandwidth().toDouble();
- double newBandwidth = ba.bandwidth().toDouble() - requestedBandwidth;
- checkState(newBandwidth >= 0.0);
- freeRes.remove(ba);
- freeRes.add(new BandwidthResourceAllocation(
- Bandwidth.valueOf(newBandwidth)));
+ {
+ Set<? extends ResourceAllocation> bw = caps.get(ResourceType.BANDWIDTH);
+ if (bw == null || bw.isEmpty()) {
+ bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW));
+ }
+
+ BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next();
+ double freeBw = cap.bandwidth().toDouble();
+
+ // enumerate current allocations, subtracting resources
+ for (LinkResourceAllocations alloc : allocations) {
+ Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
+ for (ResourceAllocation a : types) {
+ if (a instanceof BandwidthResourceAllocation) {
+ BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a;
+ freeBw -= bwA.bandwidth().toDouble();
+ }
+ }
+ }
+
+ free.put(type, Sets.newHashSet(new BandwidthResourceAllocation(Bandwidth.valueOf(freeBw))));
break;
+ }
+
case LAMBDA:
- checkState(freeRes.remove(res));
+ {
+ Set<? extends ResourceAllocation> lmd = caps.get(type);
+ if (lmd == null || lmd.isEmpty()) {
+ // nothing left
+ break;
+ }
+ Set<LambdaResourceAllocation> freeL = new HashSet<>();
+ for (ResourceAllocation r : lmd) {
+ if (r instanceof LambdaResourceAllocation) {
+ freeL.add((LambdaResourceAllocation) r);
+ }
+ }
+
+ // enumerate current allocations, removing resources
+ for (LinkResourceAllocations alloc : allocations) {
+ Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
+ for (ResourceAllocation a : types) {
+ if (a instanceof LambdaResourceAllocation) {
+ freeL.remove(a);
+ }
+ }
+ }
+
+ free.put(type, freeL);
break;
+ }
+
default:
break;
}
}
- freeResources.put(link, freeRes);
-
+ return free;
}
- /**
- * Adds given resources to free resources for given link.
- *
- * @param link the target link
- * @param allocations the resources to be added
- */
- private synchronized void addFreeResources(Link link, LinkResourceAllocations allocations) {
- // TODO Use lock or version for updating freeResources.
- Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
- Set<ResourceAllocation> addRes = allocations.getResourceAllocation(link);
- for (ResourceAllocation res : addRes) {
- switch (res.type()) {
- case BANDWIDTH:
- BandwidthResourceAllocation ba = getBandwidth(freeRes);
- double requestedBandwidth =
- ((BandwidthResourceAllocation) res).bandwidth().toDouble();
- double newBandwidth = ba.bandwidth().toDouble() + requestedBandwidth;
- freeRes.remove(ba);
- freeRes.add(new BandwidthResourceAllocation(
- Bandwidth.valueOf(newBandwidth)));
+ private LinkResourceAllocations getIntentAllocations(IntentId id) {
+ VersionedValue vv
+ = databaseService.get(INTENT_ALLOCATIONS, toIntentDbKey(checkNotNull(id)));
+ if (vv == null || vv.value() == null) {
+ return null;
+ }
+ return decodeIntentAllocations(vv.value());
+ }
+
+ private Builder putIntentAllocations(Builder ctx,
+ IntentId id,
+ LinkResourceAllocations alloc) {
+ return ctx.put(INTENT_ALLOCATIONS,
+ toIntentDbKey(id),
+ encodeIntentAllocations(alloc));
+ }
+
+
+ @Override
+ public void allocateResources(LinkResourceAllocations allocations) {
+ checkNotNull(allocations);
+
+ Builder tx = BatchWriteRequest.newBuilder();
+
+ // TODO: Should IntentId -> Allocation be updated conditionally?
+ putIntentAllocations(tx, allocations.intendId(), allocations);
+
+ for (Link link : allocations.links()) {
+ allocateLinkResource(tx, link, allocations);
+ }
+
+ BatchWriteRequest batch = tx.build();
+// log.info("Intent: {}", databaseService.getAll(INTENT_ALLOCATIONS));
+// log.info("Link: {}", databaseService.getAll(LINK_RESOURCE_ALLOCATIONS));
+
+ BatchWriteResult result = databaseService.batchWrite(batch);
+ if (!result.isSuccessful()) {
+ log.error("Allocation Failed.");
+ if (log.isDebugEnabled()) {
+ logFailureDetail(batch, result);
+ }
+ // FIXME throw appropriate exception, with what failed.
+ checkState(result.isSuccessful(), "Allocation failed");
+ }
+ }
+
+ private void logFailureDetail(BatchWriteRequest batch,
+ BatchWriteResult result) {
+ for (int i = 0; i < batch.batchSize(); ++i) {
+ final WriteRequest req = batch.getAsList().get(i);
+ final WriteResult fail = result.getAsList().get(i);
+ switch (fail.status()) {
+ case ABORTED:
+ log.debug("ABORTED: {}@{}", req.key(), req.tableName());
break;
- case LAMBDA:
- checkState(freeRes.add(res));
+ case PRECONDITION_VIOLATION:
+ switch (req.type()) {
+ case PUT_IF_ABSENT:
+ log.debug("{}: {}@{} : {}", req.type(),
+ req.key(), req.tableName(), fail.previousValue());
+ break;
+ case PUT_IF_VALUE:
+ case REMOVE_IF_VALUE:
+ log.debug("{}: {}@{} : was {}, expected {}", req.type(),
+ req.key(), req.tableName(),
+ fail.previousValue(),
+ toHexString(req.oldValue()));
+ break;
+ case PUT_IF_VERSION:
+ case REMOVE_IF_VERSION:
+ log.debug("{}: {}@{} : was {}, expected {}", req.type(),
+ req.key(), req.tableName(),
+ fail.previousValue().version(),
+ req.previousVersion());
+ break;
+ default:
+ log.error("Should never reach here.");
+ break;
+ }
break;
default:
+ log.error("Should never reach here.");
break;
}
}
- freeResources.put(link, freeRes);
}
- @Override
- public synchronized Set<ResourceAllocation> getFreeResources(Link link) {
- checkNotNull(link);
- Set<ResourceAllocation> freeRes = freeResources.get(link);
- if (freeRes == null) {
- freeRes = readOriginalFreeResources(link);
- }
+ private Builder allocateLinkResource(Builder builder, Link link,
+ LinkResourceAllocations allocations) {
- return freeRes;
- }
+ // requested resources
+ Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link);
- @Override
- public synchronized void allocateResources(LinkResourceAllocations allocations) {
- checkNotNull(allocations);
- linkResourceAllocationsMap.put(allocations.intendId(), allocations);
- for (Link link : allocations.links()) {
- subtractFreeResources(link, allocations);
- Set<LinkResourceAllocations> linkAllocs = allocatedResources.get(link);
- if (linkAllocs == null) {
- linkAllocs = new HashSet<>();
+ Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(link);
+ for (ResourceAllocation req : reqs) {
+ Set<? extends ResourceAllocation> avail = available.get(req.type());
+ if (req instanceof BandwidthResourceAllocation) {
+ // check if allocation should be accepted
+ if (avail.isEmpty()) {
+ checkState(!avail.isEmpty(),
+ "There's no Bandwidth resource on %s?",
+ link);
+ }
+ BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next();
+ double bwLeft = bw.bandwidth().toDouble();
+ bwLeft -= ((BandwidthResourceAllocation) req).bandwidth().toDouble();
+ if (bwLeft < 0) {
+ // FIXME throw appropriate Exception
+ checkState(bwLeft >= 0,
+ "There's no Bandwidth left on %s. %s",
+ link, bwLeft);
+ }
+ } else if (req instanceof LambdaResourceAllocation) {
+
+ // check if allocation should be accepted
+ if (!avail.contains(req)) {
+ // requested lambda was not available
+ // FIXME throw appropriate exception
+ checkState(avail.contains(req),
+ "Allocating %s on %s failed",
+ req, link);
+ }
}
- linkAllocs.add(allocations);
- allocatedResources.put(link, linkAllocs);
}
+ // all requests allocatable => add allocation
+ final List<LinkResourceAllocations> before = getAllocations(link);
+ List<LinkResourceAllocations> after = new ArrayList<>(before.size());
+ after.addAll(before);
+ after.add(allocations);
+ replaceLinkAllocations(builder, LinkKey.linkKey(link), before, after);
+ return builder;
+ }
+
+ private Builder replaceLinkAllocations(Builder builder, LinkKey linkKey,
+ List<LinkResourceAllocations> before,
+ List<LinkResourceAllocations> after) {
+
+ byte[] oldValue = encodeLinkAllocations(before);
+ byte[] newValue = encodeLinkAllocations(after);
+ builder.putIfValueMatches(LINK_RESOURCE_ALLOCATIONS, toLinkDbKey(linkKey), oldValue, newValue);
+ return builder;
}
@Override
- public synchronized void releaseResources(LinkResourceAllocations allocations) {
+ public void releaseResources(LinkResourceAllocations allocations) {
checkNotNull(allocations);
- linkResourceAllocationsMap.remove(allocations.intendId());
- for (Link link : allocations.links()) {
- addFreeResources(link, allocations);
- Set<LinkResourceAllocations> linkAllocs = allocatedResources.get(link);
- if (linkAllocs == null) {
- log.error("Missing resource allocation.");
- } else {
- linkAllocs.remove(allocations);
+
+ final IntentId intendId = allocations.intendId();
+ final String dbIntentId = toIntentDbKey(intendId);
+ final Collection<Link> links = allocations.links();
+
+ // TODO: does release must happen in a batch?
+ boolean success;
+ do {
+ Builder tx = BatchWriteRequest.newBuilder();
+
+ // TODO: Should IntentId -> Allocation be updated conditionally?
+ tx.remove(INTENT_ALLOCATIONS, dbIntentId);
+
+ for (Link link : links) {
+ final LinkKey linkId = LinkKey.linkKey(link);
+ final String dbLinkId = toLinkDbKey(linkId);
+ VersionedValue vv = databaseService.get(LINK_RESOURCE_ALLOCATIONS, dbLinkId);
+ if (vv == null || vv.value() == null) {
+ // something is wrong, but it is already freed
+ log.warn("There was no resource left to release on {}", linkId);
+ continue;
+ }
+ List<LinkResourceAllocations> before = decodeLinkAllocations(vv.value());
+ List<LinkResourceAllocations> after = new ArrayList<>(before);
+ after.remove(allocations);
+ byte[] oldValue = encodeLinkAllocations(before);
+ byte[] newValue = encodeLinkAllocations(after);
+ tx.putIfValueMatches(LINK_RESOURCE_ALLOCATIONS, dbLinkId, oldValue, newValue);
}
- allocatedResources.put(link, linkAllocs);
- }
+
+ BatchWriteResult batchWrite = databaseService.batchWrite(tx.build());
+ success = batchWrite.isSuccessful();
+ } while (!success);
}
@Override
- public synchronized LinkResourceAllocations getAllocations(IntentId intentId) {
+ public LinkResourceAllocations getAllocations(IntentId intentId) {
checkNotNull(intentId);
- return linkResourceAllocationsMap.get(intentId);
- }
-
- @Override
- public synchronized Iterable<LinkResourceAllocations> getAllocations(Link link) {
- checkNotNull(link);
- Set<LinkResourceAllocations> result = allocatedResources.get(link);
- if (result == null) {
- result = Collections.emptySet();
+ VersionedValue vv = databaseService.get(INTENT_ALLOCATIONS, toIntentDbKey(intentId));
+ if (vv == null) {
+ // FIXME: should we return null or LinkResourceAllocations with nothing allocated?
+ return null;
}
- return Collections.unmodifiableSet(result);
+ LinkResourceAllocations allocations = decodeIntentAllocations(vv.value());
+ return allocations;
+ }
+
+ private String toLinkDbKey(LinkKey linkid) {
+ // introduce cache if necessary
+ return linkid.toString();
+ // TODO: Above is irreversible, if we need reverse conversion
+ // we may need something like below, due to String only limitation
+// byte[] bytes = serializer.encode(linkid);
+// StringBuilder builder = new StringBuilder(bytes.length * 4);
+// boolean isFirst = true;
+// for (byte b : bytes) {
+// if (!isFirst) {
+// builder.append(',');
+// }
+// builder.append(b);
+// isFirst = false;
+// }
+// return builder.toString();
+ }
+
+// private LinkKey toLinkKey(String linkKey) {
+// String[] bytes = linkKey.split(",");
+// ByteBuffer buf = ByteBuffer.allocate(bytes.length);
+// for (String bs : bytes) {
+// buf.put(Byte.parseByte(bs));
+// }
+// buf.flip();
+// return serializer.decode(buf);
+// }
+
+ private String toIntentDbKey(IntentId intentid) {
+ return intentid.toString();
+ }
+
+ private IntentId toIntentId(String intentid) {
+ checkArgument(intentid.startsWith("0x"));
+ return IntentId.valueOf(Long.parseLong(intentid.substring(2)));
+ }
+
+ private LinkResourceAllocations decodeIntentAllocations(byte[] bytes) {
+ return serializer.decode(bytes);
+ }
+
+ private byte[] encodeIntentAllocations(LinkResourceAllocations alloc) {
+ return serializer.encode(checkNotNull(alloc));
+ }
+
+ private List<LinkResourceAllocations> decodeLinkAllocations(byte[] bytes) {
+ return serializer.decode(bytes);
+ }
+
+ private byte[] encodeLinkAllocations(List<LinkResourceAllocations> alloc) {
+ return serializer.encode(checkNotNull(alloc));
}
@Override
- public synchronized Iterable<LinkResourceAllocations> getAllocations() {
- return Collections.unmodifiableCollection(linkResourceAllocationsMap.values());
+ public List<LinkResourceAllocations> getAllocations(Link link) {
+ checkNotNull(link);
+ final LinkKey key = LinkKey.linkKey(link);
+ final String dbKey = toLinkDbKey(key);
+ VersionedValue vv = databaseService.get(LINK_RESOURCE_ALLOCATIONS, dbKey);
+ if (vv == null) {
+ // write empty so that all other update can be replace operation
+ byte[] emptyList = encodeLinkAllocations(new ArrayList<>());
+ boolean written = databaseService.putIfAbsent(LINK_RESOURCE_ALLOCATIONS, dbKey, emptyList);
+ log.trace("Empty allocation write success? {}", written);
+ vv = databaseService.get(LINK_RESOURCE_ALLOCATIONS, dbKey);
+ if (vv == null) {
+ log.error("Failed to re-read allocation for {}", dbKey);
+ // note: cannot be Collections.emptyList();
+ return new ArrayList<>();
+ }
+ }
+ List<LinkResourceAllocations> allocations = decodeLinkAllocations(vv.value());
+ return allocations;
+ }
+
+ @Override
+ public Iterable<LinkResourceAllocations> getAllocations() {
+ //IntentId -> LinkResourceAllocations
+ Map<String, VersionedValue> all = databaseService.getAll(INTENT_ALLOCATIONS);
+
+ return FluentIterable.from(all.values())
+ .transform(new Function<VersionedValue, LinkResourceAllocations>() {
+
+ @Override
+ public LinkResourceAllocations apply(VersionedValue input) {
+ if (input == null || input.value() == null) {
+ return null;
+ }
+ return decodeIntentAllocations(input.value());
+ }
+ })
+ .filter(notNull());
}
}