ECLinkStore: LinkStore based on EventuallyConsistentMap (disabled right now)
Change-Id: Ib271ad6da90eb8b4d39db160e13c84b7bb695c9b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
new file mode 100644
index 0000000..ac7f6bc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/ECLinkStore.java
@@ -0,0 +1,384 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.link.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.DefaultAnnotations.union;
+import static org.onosproject.net.Link.State.ACTIVE;
+import static org.onosproject.net.Link.State.INACTIVE;
+import static org.onosproject.net.Link.Type.DIRECT;
+import static org.onosproject.net.Link.Type.INDIRECT;
+import static org.onosproject.net.LinkKey.linkKey;
+import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
+import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
+import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.AnnotationsUtil;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultLink;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Link;
+import org.onosproject.net.LinkKey;
+import org.onosproject.net.Link.Type;
+import org.onosproject.net.device.DeviceClockService;
+import org.onosproject.net.link.DefaultLinkDescription;
+import org.onosproject.net.link.LinkDescription;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.link.LinkStore;
+import org.onosproject.net.link.LinkStoreDelegate;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * Manages the inventory of links using a {@code EventuallyConsistentMap}.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class ECLinkStore
+ extends AbstractStore<LinkEvent, LinkStoreDelegate>
+ implements LinkStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
+ private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
+
+ private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceClockService deviceClockService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
+ new InternalLinkTracker();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(DistributedStoreSerializers.STORE_COMMON)
+ .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
+ .register(Provided.class)
+ .build();
+ }
+ };
+
+ @Activate
+ public void activate() {
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MastershipBasedTimestamp.class)
+ .register(Provided.class);
+
+ linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
+ .withName("onos-link-descriptions")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> {
+ try {
+ return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
+ } catch (IllegalStateException e) {
+ return null;
+ }
+ }).build();
+
+ clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
+ SERIALIZER::decode,
+ this::injectLink,
+ SharedExecutors.getPoolThreadExecutor());
+
+ linkDescriptions.addListener(linkTracker);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ linkDescriptions.removeListener(linkTracker);
+ linkDescriptions.destroy();
+ links.clear();
+ clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public int getLinkCount() {
+ return links.size();
+ }
+
+ @Override
+ public Iterable<Link> getLinks() {
+ return links.values();
+ }
+
+ @Override
+ public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
+ return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
+ }
+
+ @Override
+ public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
+ return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
+ }
+
+ @Override
+ public Link getLink(ConnectPoint src, ConnectPoint dst) {
+ return links.get(linkKey(src, dst));
+ }
+
+ @Override
+ public Set<Link> getEgressLinks(ConnectPoint src) {
+ return filter(links.values(), link -> src.equals(link.src()));
+ }
+
+ @Override
+ public Set<Link> getIngressLinks(ConnectPoint dst) {
+ return filter(links.values(), link -> dst.equals(link.dst()));
+ }
+
+ @Override
+ public LinkEvent createOrUpdateLink(ProviderId providerId,
+ LinkDescription linkDescription) {
+ final DeviceId dstDeviceId = linkDescription.dst().deviceId();
+ final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
+
+ // Process link update only if we're the master of the destination node,
+ // otherwise signal the actual master.
+ if (clusterService.getLocalNode().id().equals(dstNodeId)) {
+ LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
+ Provided<LinkKey> internalLinkKey = new Provided<>(linkKey, providerId);
+ linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v , linkDescription));
+ return refreshLinkCache(linkKey);
+ } else {
+ if (dstNodeId == null) {
+ return null;
+ }
+ return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
+ LINK_INJECT_MESSAGE,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ dstNodeId));
+ }
+ }
+
+ private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
+ if (current != null) {
+ // we only allow transition from INDIRECT -> DIRECT
+ return new DefaultLinkDescription(
+ current.src(),
+ current.dst(),
+ current.type() == DIRECT ? DIRECT : updated.type(),
+ union(current.annotations(), updated.annotations()));
+ }
+ return updated;
+ }
+
+ private LinkEvent refreshLinkCache(LinkKey linkKey) {
+ AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
+ Link link = links.compute(linkKey, (key, existingLink) -> {
+ Link newLink = composeLink(linkKey);
+ if (existingLink == null) {
+ eventType.set(LINK_ADDED);
+ return newLink;
+ } else if (existingLink.state() != newLink.state() ||
+ (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
+ !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
+ eventType.set(LINK_UPDATED);
+ return newLink;
+ } else {
+ return existingLink;
+ }
+ });
+ return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
+ }
+
+ private Set<ProviderId> getAllProviders(LinkKey linkKey) {
+ return linkDescriptions.keySet()
+ .stream()
+ .filter(key -> key.key().equals(linkKey))
+ .map(key -> key.providerId())
+ .collect(Collectors.toSet());
+ }
+
+ private ProviderId getBaseProviderId(LinkKey linkKey) {
+ Set<ProviderId> allProviders = getAllProviders(linkKey);
+ if (allProviders.size() > 0) {
+ return allProviders.stream()
+ .filter(p -> !p.isAncillary())
+ .findFirst()
+ .orElse(Iterables.getFirst(allProviders, null));
+ }
+ return null;
+ }
+
+ private Link composeLink(LinkKey linkKey) {
+
+ ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey));
+ LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
+
+ ConnectPoint src = base.src();
+ ConnectPoint dst = base.dst();
+ Type type = base.type();
+ AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
+ annotations.set(merge(annotations.get(), base.annotations()));
+
+ getAllProviders(linkKey).stream()
+ .map(p -> new Provided<>(linkKey, p))
+ .forEach(key -> {
+ annotations.set(merge(annotations.get(),
+ linkDescriptions.get(key).annotations()));
+ });
+
+ boolean isDurable = Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
+ return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations.get());
+ }
+
+ // Updates, if necessary the specified link and returns the appropriate event.
+ // Guarded by linkDescs value (=locking each Link)
+ private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
+ // Note: INDIRECT -> DIRECT transition only
+ // so that BDDP discovered Link will not overwrite LDDP Link
+ if (oldLink.state() != newLink.state() ||
+ (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
+ !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
+
+ links.put(key, newLink);
+ return new LinkEvent(LINK_UPDATED, newLink);
+ }
+ return null;
+ }
+
+ @Override
+ public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
+ Link link = getLink(src, dst);
+ if (link == null) {
+ return null;
+ }
+
+ if (link.isDurable()) {
+ // FIXME: this will not sync link state!!!
+ return link.state() == INACTIVE ? null :
+ updateLink(linkKey(link.src(), link.dst()), link,
+ new DefaultLink(link.providerId(),
+ link.src(), link.dst(),
+ link.type(), INACTIVE,
+ link.isDurable(),
+ link.annotations()));
+ }
+ return removeLink(src, dst);
+ }
+
+ @Override
+ public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
+ final LinkKey linkKey = LinkKey.linkKey(src, dst);
+ LinkDescription removedLinkDescription =
+ linkDescriptions.remove(new Provided<>(linkKey, checkNotNull(getBaseProviderId(linkKey))));
+ if (removedLinkDescription != null) {
+ return purgeLinkCache(linkKey);
+ }
+ return null;
+ }
+
+ private LinkEvent purgeLinkCache(LinkKey linkKey) {
+ Link removedLink = links.remove(linkKey);
+ if (removedLink != null) {
+ getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
+ return new LinkEvent(LINK_REMOVED, removedLink);
+ }
+ return null;
+ }
+
+ private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
+ return links.stream().filter(predicate).collect(Collectors.toSet());
+ }
+
+ private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
+ log.trace("Received request to inject link {}", linkInjectRequest);
+
+ ProviderId providerId = linkInjectRequest.providerId();
+ LinkDescription linkDescription = linkInjectRequest.key();
+
+ final DeviceId deviceId = linkDescription.dst().deviceId();
+ if (!deviceClockService.isTimestampAvailable(deviceId)) {
+ // workaround for ONOS-1208
+ log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
+ return null;
+ }
+ return createOrUpdateLink(providerId, linkDescription);
+ }
+
+ private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
+ if (event.type() == PUT) {
+ notifyDelegate(refreshLinkCache(event.key().key()));
+ } else if (event.type() == REMOVE) {
+ notifyDelegate(purgeLinkCache(event.key().key()));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/Provided.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/Provided.java
new file mode 100644
index 0000000..b5b9e64
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/Provided.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.link.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Encapsulation of a provider supplied key.
+ *
+ * @param <K> key
+ */
+public class Provided<K> {
+ private final K key;
+ private final ProviderId providerId;
+
+ public Provided(K key, ProviderId providerId) {
+ this.key = key;
+ this.providerId = providerId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, providerId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof Provided) {
+ Provided<K> that = (Provided) other;
+ return Objects.equals(key, that.key) &&
+ Objects.equals(providerId, that.providerId);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("key", key)
+ .add("providerId", providerId)
+ .toString();
+ }
+}