blob: 2240eb4f5cb7e194fd4dac45f7ea9acf4132be65 [file] [log] [blame]
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.link.impl;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SharedExecutors;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.AnnotationsUtil;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultLink;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.Link.Type;
import org.onosproject.net.LinkKey;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.device.DeviceClockService;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.link.LinkStore;
import org.onosproject.net.link.LinkStoreDelegate;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.DefaultAnnotations.union;
import static org.onosproject.net.Link.State.ACTIVE;
import static org.onosproject.net.Link.State.INACTIVE;
import static org.onosproject.net.Link.Type.DIRECT;
import static org.onosproject.net.Link.Type.INDIRECT;
import static org.onosproject.net.LinkKey.linkKey;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of links using a {@code EventuallyConsistentMap}.
*/
@Component(immediate = true)
@Service
public class ECLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
/**
* Modes for dealing with newly discovered links.
*/
protected enum LinkDiscoveryMode {
/**
* Permissive mode - all newly discovered links are valid.
*/
PERMISSIVE,
/**
* Strict mode - all newly discovered links must be defined in
* the network config.
*/
STRICT
}
private final Logger log = getLogger(getClass());
private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
private final Map<LinkKey, Set<ProviderId>> linkProviders = Maps.newConcurrentMap();
private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
private ApplicationId appId;
private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry netCfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
new InternalLinkTracker();
// Listener for config changes
private final InternalConfigListener cfgListener = new InternalConfigListener();
protected LinkDiscoveryMode linkDiscoveryMode = LinkDiscoveryMode.STRICT;
protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
KryoNamespace.newBuilder()
.register(DistributedStoreSerializers.STORE_COMMON)
.nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
.register(Provided.class)
.build("ECLink"));
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.core");
netCfgService.registerConfigFactory(factory);
netCfgService.addListener(cfgListener);
cfgListener.reconfigure(netCfgService.getConfig(appId, CoreConfig.class));
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MastershipBasedTimestamp.class)
.register(Provided.class);
linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
.withName("onos-link-descriptions")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> {
try {
return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
} catch (IllegalStateException e) {
return null;
}
}).build();
clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
SERIALIZER::decode,
this::injectLink,
SERIALIZER::encode,
SharedExecutors.getPoolThreadExecutor());
linkDescriptions.addListener(linkTracker);
log.info("Started");
}
@Deactivate
public void deactivate() {
linkDescriptions.removeListener(linkTracker);
linkDescriptions.destroy();
linkProviders.clear();
links.clear();
clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
netCfgService.removeListener(cfgListener);
netCfgService.unregisterConfigFactory(factory);
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
return links.values();
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
return links.get(linkKey(src, dst));
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
return filter(links.values(), link -> src.equals(link.src()));
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
return filter(links.values(), link -> dst.equals(link.dst()));
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
final DeviceId dstDeviceId = linkDescription.dst().deviceId();
final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
// Process link update only if we're the master of the destination node,
// otherwise signal the actual master.
if (clusterService.getLocalNode().id().equals(dstNodeId)) {
LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
Provided<LinkKey> internalLinkKey = getProvided(linkKey, providerId);
if (internalLinkKey == null) {
return null;
}
linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v, linkDescription));
return refreshLinkCache(linkKey);
} else {
// Only forward for ConfigProvider or NullProvider
// Forwarding was added as a workaround for ONOS-490
if (!providerId.scheme().equals("cfg") && !providerId.scheme().equals("null")) {
return null;
}
// Temporary hack for NPE (ONOS-1171).
// Proper fix is to implement forwarding to master on ConfigProvider
if (dstNodeId == null) {
return null;
}
return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
LINK_INJECT_MESSAGE,
SERIALIZER::encode,
SERIALIZER::decode,
dstNodeId));
}
}
private Provided<LinkKey> getProvided(LinkKey linkKey, ProviderId provId) {
ProviderId bpid = getBaseProviderId(linkKey);
if (provId == null) {
// The LinkService didn't know who this LinkKey belongs to.
// A fix is to either modify the getProvider() in LinkService classes
// or expose the contents of linkDescriptions to the LinkService.
return (bpid == null) ? null : new Provided<>(linkKey, bpid);
} else {
return new Provided<>(linkKey, provId);
}
}
private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
if (current != null) {
Type type;
if (current.type() == DIRECT && updated.type() == Type.INDIRECT) {
// mask transition from DIRECT -> INDIRECT, likely to be triggered by BDDP
type = Type.DIRECT;
} else {
type = updated.type();
}
return new DefaultLinkDescription(
current.src(),
current.dst(),
type,
current.isExpected(),
union(current.annotations(), updated.annotations()));
}
return updated;
}
private Set<ProviderId> createOrUpdateLinkProviders(Set<ProviderId> current, ProviderId providerId) {
if (current == null) {
current = Sets.newConcurrentHashSet();
}
current.add(providerId);
return current;
}
private LinkEvent refreshLinkCache(LinkKey linkKey) {
AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
Link link = links.compute(linkKey, (key, existingLink) -> {
Link newLink = composeLink(linkKey);
if (newLink == null) {
return null;
}
if (existingLink == null) {
eventType.set(LINK_ADDED);
return newLink;
} else if (existingLink.state() != newLink.state() ||
existingLink.isExpected() != newLink.isExpected() ||
(existingLink.type() != newLink.type()) ||
!AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
eventType.set(LINK_UPDATED);
return newLink;
} else {
return existingLink;
}
});
return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
}
private Set<ProviderId> getAllProviders(LinkKey linkKey) {
return linkProviders.getOrDefault(linkKey, Sets.newConcurrentHashSet());
}
private ProviderId getBaseProviderId(LinkKey linkKey) {
Set<ProviderId> allProviders = getAllProviders(linkKey);
if (allProviders.size() > 0) {
return allProviders.stream()
.filter(p -> !p.isAncillary())
.findFirst()
.orElse(Iterables.getFirst(allProviders, null));
}
return null;
}
private Link composeLink(LinkKey linkKey) {
ProviderId baseProviderId = getBaseProviderId(linkKey);
if (baseProviderId == null) {
// provider was not found, this means it was already removed by the
// parent component.
return null;
}
LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
// short circuit if link description no longer exists
if (base == null) {
return null;
}
ConnectPoint src = base.src();
ConnectPoint dst = base.dst();
Type type = base.type();
AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
annotations.set(merge(annotations.get(), base.annotations()));
getAllProviders(linkKey).stream()
.map(p -> new Provided<>(linkKey, p))
.forEach(key -> {
LinkDescription linkDescription = linkDescriptions.get(key);
if (linkDescription != null) {
annotations.set(merge(annotations.get(),
linkDescription.annotations()));
}
});
Link.State initialLinkState;
boolean isExpected;
if (linkDiscoveryMode == LinkDiscoveryMode.PERMISSIVE) {
initialLinkState = ACTIVE;
isExpected =
Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
} else {
initialLinkState = base.isExpected() ? ACTIVE : INACTIVE;
isExpected = base.isExpected();
}
return DefaultLink.builder()
.providerId(baseProviderId)
.src(src)
.dst(dst)
.type(type)
.state(initialLinkState)
.isExpected(isExpected)
.annotations(annotations.get())
.build();
}
// Updates, if necessary the specified link and returns the appropriate event.
// Guarded by linkDescs value (=locking each Link)
private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
// Note: INDIRECT -> DIRECT transition only
// so that BDDP discovered Link will not overwrite LDDP Link
if (oldLink.state() != newLink.state() ||
(oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
return new LinkEvent(LINK_UPDATED, newLink);
}
return null;
}
@Override
public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
Link link = getLink(src, dst);
if (link == null) {
return null;
}
if (linkDiscoveryMode == LinkDiscoveryMode.PERMISSIVE && link.isExpected()) {
// FIXME: this will not sync link state!!!
return link.state() == INACTIVE ? null :
updateLink(linkKey(link.src(), link.dst()), link,
DefaultLink.builder()
.providerId(link.providerId())
.src(link.src())
.dst(link.dst())
.type(link.type())
.state(INACTIVE)
.isExpected(link.isExpected())
.annotations(link.annotations())
.build());
}
return removeLink(src, dst);
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
final LinkKey linkKey = LinkKey.linkKey(src, dst);
ProviderId primaryProviderId = getBaseProviderId(linkKey);
// Stop if there is no base provider.
if (primaryProviderId == null) {
return null;
}
LinkDescription removedLinkDescription =
linkDescriptions.remove(new Provided<>(linkKey, primaryProviderId));
if (removedLinkDescription != null) {
return purgeLinkCache(linkKey);
}
return null;
}
private LinkEvent purgeLinkCache(LinkKey linkKey) {
Link removedLink = links.remove(linkKey);
if (removedLink != null) {
getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
linkProviders.remove(linkKey);
return new LinkEvent(LINK_REMOVED, removedLink);
}
return null;
}
private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
return links.stream().filter(predicate).collect(Collectors.toSet());
}
private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
log.trace("Received request to inject link {}", linkInjectRequest);
ProviderId providerId = linkInjectRequest.providerId();
LinkDescription linkDescription = linkInjectRequest.key();
final DeviceId deviceId = linkDescription.dst().deviceId();
if (!deviceClockService.isTimestampAvailable(deviceId)) {
// workaround for ONOS-1208
log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
return null;
}
return createOrUpdateLink(providerId, linkDescription);
}
private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
@Override
public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
if (event.type() == PUT) {
linkProviders.compute(event.key().key(), (k, v) ->
createOrUpdateLinkProviders(v, event.key().providerId()));
notifyDelegate(refreshLinkCache(event.key().key()));
} else if (event.type() == REMOVE) {
notifyDelegate(purgeLinkCache(event.key().key()));
linkProviders.remove(event.key().key());
}
}
}
private class InternalConfigListener implements NetworkConfigListener {
void reconfigure(CoreConfig coreConfig) {
if (coreConfig == null) {
linkDiscoveryMode = LinkDiscoveryMode.PERMISSIVE;
} else {
linkDiscoveryMode = coreConfig.linkDiscoveryMode();
}
if (linkDiscoveryMode == LinkDiscoveryMode.STRICT) {
// Remove any previous links to force them to go through the strict
// discovery process
if (linkDescriptions != null) {
linkDescriptions.clear();
}
if (links != null) {
links.clear();
}
}
log.debug("config set link discovery mode to {}",
linkDiscoveryMode.name());
}
@Override
public void event(NetworkConfigEvent event) {
if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
event.configClass().equals(CoreConfig.class)) {
CoreConfig cfg = netCfgService.getConfig(appId, CoreConfig.class);
reconfigure(cfg);
log.info("Reconfigured");
}
}
}
// Configuration properties factory
private final ConfigFactory factory =
new ConfigFactory<ApplicationId, CoreConfig>(APP_SUBJECT_FACTORY,
CoreConfig.class,
"core") {
@Override
public CoreConfig createConfig() {
return new CoreConfig();
}
};
}