blob: 243caf80c912848cf0dc64d986a02d16178e0a0f [file] [log] [blame]
Madan Jampanibad538c2015-08-19 17:35:27 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.link.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static org.onosproject.net.DefaultAnnotations.merge;
20import static org.onosproject.net.DefaultAnnotations.union;
21import static org.onosproject.net.Link.State.ACTIVE;
22import static org.onosproject.net.Link.State.INACTIVE;
23import static org.onosproject.net.Link.Type.DIRECT;
24import static org.onosproject.net.Link.Type.INDIRECT;
25import static org.onosproject.net.LinkKey.linkKey;
26import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
27import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
28import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
29import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
30import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
31import static org.slf4j.LoggerFactory.getLogger;
32
33import java.util.Collection;
34import java.util.Map;
35import java.util.Objects;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicReference;
38import java.util.function.Predicate;
39import java.util.stream.Collectors;
40
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
44import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
46import org.apache.felix.scr.annotations.Service;
47import org.onlab.util.KryoNamespace;
48import org.onlab.util.SharedExecutors;
49import org.onosproject.cluster.ClusterService;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.mastership.MastershipService;
52import org.onosproject.net.AnnotationKeys;
53import org.onosproject.net.AnnotationsUtil;
54import org.onosproject.net.ConnectPoint;
55import org.onosproject.net.DefaultAnnotations;
56import org.onosproject.net.DefaultLink;
57import org.onosproject.net.DeviceId;
58import org.onosproject.net.Link;
59import org.onosproject.net.LinkKey;
60import org.onosproject.net.Link.Type;
61import org.onosproject.net.device.DeviceClockService;
62import org.onosproject.net.link.DefaultLinkDescription;
63import org.onosproject.net.link.LinkDescription;
64import org.onosproject.net.link.LinkEvent;
65import org.onosproject.net.link.LinkStore;
66import org.onosproject.net.link.LinkStoreDelegate;
67import org.onosproject.net.provider.ProviderId;
68import org.onosproject.store.AbstractStore;
69import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
70import org.onosproject.store.cluster.messaging.MessageSubject;
71import org.onosproject.store.impl.MastershipBasedTimestamp;
72import org.onosproject.store.serializers.KryoNamespaces;
73import org.onosproject.store.serializers.KryoSerializer;
74import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75import org.onosproject.store.service.EventuallyConsistentMap;
76import org.onosproject.store.service.EventuallyConsistentMapEvent;
77import org.onosproject.store.service.EventuallyConsistentMapListener;
78import org.onosproject.store.service.StorageService;
79import org.slf4j.Logger;
80
81import com.google.common.collect.Iterables;
82import com.google.common.collect.Maps;
83import com.google.common.util.concurrent.Futures;
84
85/**
86 * Manages the inventory of links using a {@code EventuallyConsistentMap}.
87 */
Madan Jampani8856ee12015-08-21 09:24:27 -070088@Component(immediate = true, enabled = true)
Madan Jampanibad538c2015-08-19 17:35:27 -070089@Service
90public class ECLinkStore
91 extends AbstractStore<LinkEvent, LinkStoreDelegate>
92 implements LinkStore {
93
94 private final Logger log = getLogger(getClass());
95
96 private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
97 private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
98
99 private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected StorageService storageService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected MastershipService mastershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected DeviceClockService deviceClockService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
116 private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
117 new InternalLinkTracker();
118
119 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
120 @Override
121 protected void setupKryoPool() {
122 serializerPool = KryoNamespace.newBuilder()
123 .register(DistributedStoreSerializers.STORE_COMMON)
124 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
125 .register(Provided.class)
126 .build();
127 }
128 };
129
130 @Activate
131 public void activate() {
132 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
133 .register(KryoNamespaces.API)
134 .register(MastershipBasedTimestamp.class)
135 .register(Provided.class);
136
137 linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
138 .withName("onos-link-descriptions")
139 .withSerializer(serializer)
140 .withTimestampProvider((k, v) -> {
141 try {
142 return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
143 } catch (IllegalStateException e) {
144 return null;
145 }
146 }).build();
147
148 clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
149 SERIALIZER::decode,
150 this::injectLink,
Madan Jampanif8d572f2015-08-25 12:14:43 -0700151 SERIALIZER::encode,
Madan Jampanibad538c2015-08-19 17:35:27 -0700152 SharedExecutors.getPoolThreadExecutor());
153
154 linkDescriptions.addListener(linkTracker);
155
156 log.info("Started");
157 }
158
159 @Deactivate
160 public void deactivate() {
161 linkDescriptions.removeListener(linkTracker);
162 linkDescriptions.destroy();
163 links.clear();
164 clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
165
166 log.info("Stopped");
167 }
168
169 @Override
170 public int getLinkCount() {
171 return links.size();
172 }
173
174 @Override
175 public Iterable<Link> getLinks() {
176 return links.values();
177 }
178
179 @Override
180 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
181 return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
182 }
183
184 @Override
185 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
186 return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
187 }
188
189 @Override
190 public Link getLink(ConnectPoint src, ConnectPoint dst) {
191 return links.get(linkKey(src, dst));
192 }
193
194 @Override
195 public Set<Link> getEgressLinks(ConnectPoint src) {
196 return filter(links.values(), link -> src.equals(link.src()));
197 }
198
199 @Override
200 public Set<Link> getIngressLinks(ConnectPoint dst) {
201 return filter(links.values(), link -> dst.equals(link.dst()));
202 }
203
204 @Override
205 public LinkEvent createOrUpdateLink(ProviderId providerId,
206 LinkDescription linkDescription) {
207 final DeviceId dstDeviceId = linkDescription.dst().deviceId();
208 final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
209
210 // Process link update only if we're the master of the destination node,
211 // otherwise signal the actual master.
212 if (clusterService.getLocalNode().id().equals(dstNodeId)) {
213 LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
Ayaka Koshibe2c59acf2015-09-08 15:37:47 -0700214 Provided<LinkKey> internalLinkKey = getProvided(linkKey, providerId);
215 if (internalLinkKey == null) {
216 return null;
217 }
Madan Jampanibad538c2015-08-19 17:35:27 -0700218 linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v , linkDescription));
219 return refreshLinkCache(linkKey);
220 } else {
221 if (dstNodeId == null) {
222 return null;
223 }
224 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
225 LINK_INJECT_MESSAGE,
226 SERIALIZER::encode,
227 SERIALIZER::decode,
228 dstNodeId));
229 }
230 }
231
Ayaka Koshibe2c59acf2015-09-08 15:37:47 -0700232 private Provided<LinkKey> getProvided(LinkKey linkKey, ProviderId provId) {
233 ProviderId bpid = getBaseProviderId(linkKey);
234 if (provId == null) {
235 // The LinkService didn't know who this LinkKey belongs to.
236 // A fix is to either modify the getProvider() in LinkService classes
237 // or expose the contents of linkDescriptions to the LinkService.
238 return (bpid == null) ? null : new Provided<>(linkKey, bpid);
239 } else {
240 return new Provided<>(linkKey, provId);
241 }
242 }
243
Madan Jampanibad538c2015-08-19 17:35:27 -0700244 private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
245 if (current != null) {
246 // we only allow transition from INDIRECT -> DIRECT
247 return new DefaultLinkDescription(
248 current.src(),
249 current.dst(),
250 current.type() == DIRECT ? DIRECT : updated.type(),
251 union(current.annotations(), updated.annotations()));
252 }
253 return updated;
254 }
255
256 private LinkEvent refreshLinkCache(LinkKey linkKey) {
257 AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
258 Link link = links.compute(linkKey, (key, existingLink) -> {
259 Link newLink = composeLink(linkKey);
260 if (existingLink == null) {
261 eventType.set(LINK_ADDED);
262 return newLink;
263 } else if (existingLink.state() != newLink.state() ||
264 (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
265 !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
266 eventType.set(LINK_UPDATED);
267 return newLink;
268 } else {
269 return existingLink;
270 }
271 });
272 return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
273 }
274
275 private Set<ProviderId> getAllProviders(LinkKey linkKey) {
276 return linkDescriptions.keySet()
277 .stream()
278 .filter(key -> key.key().equals(linkKey))
279 .map(key -> key.providerId())
280 .collect(Collectors.toSet());
281 }
282
283 private ProviderId getBaseProviderId(LinkKey linkKey) {
284 Set<ProviderId> allProviders = getAllProviders(linkKey);
285 if (allProviders.size() > 0) {
286 return allProviders.stream()
287 .filter(p -> !p.isAncillary())
288 .findFirst()
289 .orElse(Iterables.getFirst(allProviders, null));
290 }
291 return null;
292 }
293
294 private Link composeLink(LinkKey linkKey) {
295
296 ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey));
297 LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
298
299 ConnectPoint src = base.src();
300 ConnectPoint dst = base.dst();
301 Type type = base.type();
302 AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
303 annotations.set(merge(annotations.get(), base.annotations()));
304
305 getAllProviders(linkKey).stream()
306 .map(p -> new Provided<>(linkKey, p))
307 .forEach(key -> {
308 annotations.set(merge(annotations.get(),
309 linkDescriptions.get(key).annotations()));
310 });
311
312 boolean isDurable = Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
313 return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations.get());
314 }
315
316 // Updates, if necessary the specified link and returns the appropriate event.
317 // Guarded by linkDescs value (=locking each Link)
318 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
319 // Note: INDIRECT -> DIRECT transition only
320 // so that BDDP discovered Link will not overwrite LDDP Link
321 if (oldLink.state() != newLink.state() ||
322 (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
323 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
324
325 links.put(key, newLink);
326 return new LinkEvent(LINK_UPDATED, newLink);
327 }
328 return null;
329 }
330
331 @Override
332 public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
333 Link link = getLink(src, dst);
334 if (link == null) {
335 return null;
336 }
337
338 if (link.isDurable()) {
339 // FIXME: this will not sync link state!!!
340 return link.state() == INACTIVE ? null :
341 updateLink(linkKey(link.src(), link.dst()), link,
342 new DefaultLink(link.providerId(),
343 link.src(), link.dst(),
344 link.type(), INACTIVE,
345 link.isDurable(),
346 link.annotations()));
347 }
348 return removeLink(src, dst);
349 }
350
351 @Override
352 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
353 final LinkKey linkKey = LinkKey.linkKey(src, dst);
Madan Jampani5d4396f2015-09-02 16:04:20 -0700354 ProviderId primaryProviderId = getBaseProviderId(linkKey);
355 // Stop if there is no base provider.
356 if (primaryProviderId == null) {
357 return null;
358 }
Madan Jampanibad538c2015-08-19 17:35:27 -0700359 LinkDescription removedLinkDescription =
Madan Jampani5d4396f2015-09-02 16:04:20 -0700360 linkDescriptions.remove(new Provided<>(linkKey, primaryProviderId));
Madan Jampanibad538c2015-08-19 17:35:27 -0700361 if (removedLinkDescription != null) {
362 return purgeLinkCache(linkKey);
363 }
364 return null;
365 }
366
367 private LinkEvent purgeLinkCache(LinkKey linkKey) {
368 Link removedLink = links.remove(linkKey);
369 if (removedLink != null) {
370 getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
371 return new LinkEvent(LINK_REMOVED, removedLink);
372 }
373 return null;
374 }
375
376 private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
377 return links.stream().filter(predicate).collect(Collectors.toSet());
378 }
379
380 private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
381 log.trace("Received request to inject link {}", linkInjectRequest);
382
383 ProviderId providerId = linkInjectRequest.providerId();
384 LinkDescription linkDescription = linkInjectRequest.key();
385
386 final DeviceId deviceId = linkDescription.dst().deviceId();
387 if (!deviceClockService.isTimestampAvailable(deviceId)) {
388 // workaround for ONOS-1208
389 log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
390 return null;
391 }
392 return createOrUpdateLink(providerId, linkDescription);
393 }
394
395 private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
396 @Override
397 public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
398 if (event.type() == PUT) {
399 notifyDelegate(refreshLinkCache(event.key().key()));
400 } else if (event.type() == REMOVE) {
401 notifyDelegate(purgeLinkCache(event.key().key()));
402 }
403 }
404 }
405}