blob: 48851f70fb9c1ebfcbfd581410f7026a09a7ea83 [file] [log] [blame]
Madan Jampanibad538c2015-08-19 17:35:27 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.link.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static org.onosproject.net.DefaultAnnotations.merge;
20import static org.onosproject.net.DefaultAnnotations.union;
21import static org.onosproject.net.Link.State.ACTIVE;
22import static org.onosproject.net.Link.State.INACTIVE;
23import static org.onosproject.net.Link.Type.DIRECT;
24import static org.onosproject.net.Link.Type.INDIRECT;
25import static org.onosproject.net.LinkKey.linkKey;
26import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
27import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
28import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
29import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
30import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
31import static org.slf4j.LoggerFactory.getLogger;
32
33import java.util.Collection;
34import java.util.Map;
35import java.util.Objects;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicReference;
38import java.util.function.Predicate;
39import java.util.stream.Collectors;
40
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
44import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
46import org.apache.felix.scr.annotations.Service;
47import org.onlab.util.KryoNamespace;
48import org.onlab.util.SharedExecutors;
49import org.onosproject.cluster.ClusterService;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.mastership.MastershipService;
52import org.onosproject.net.AnnotationKeys;
53import org.onosproject.net.AnnotationsUtil;
54import org.onosproject.net.ConnectPoint;
55import org.onosproject.net.DefaultAnnotations;
56import org.onosproject.net.DefaultLink;
57import org.onosproject.net.DeviceId;
58import org.onosproject.net.Link;
59import org.onosproject.net.LinkKey;
60import org.onosproject.net.Link.Type;
61import org.onosproject.net.device.DeviceClockService;
62import org.onosproject.net.link.DefaultLinkDescription;
63import org.onosproject.net.link.LinkDescription;
64import org.onosproject.net.link.LinkEvent;
65import org.onosproject.net.link.LinkStore;
66import org.onosproject.net.link.LinkStoreDelegate;
67import org.onosproject.net.provider.ProviderId;
68import org.onosproject.store.AbstractStore;
69import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
70import org.onosproject.store.cluster.messaging.MessageSubject;
71import org.onosproject.store.impl.MastershipBasedTimestamp;
72import org.onosproject.store.serializers.KryoNamespaces;
73import org.onosproject.store.serializers.KryoSerializer;
74import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75import org.onosproject.store.service.EventuallyConsistentMap;
76import org.onosproject.store.service.EventuallyConsistentMapEvent;
77import org.onosproject.store.service.EventuallyConsistentMapListener;
78import org.onosproject.store.service.StorageService;
79import org.slf4j.Logger;
80
81import com.google.common.collect.Iterables;
82import com.google.common.collect.Maps;
83import com.google.common.util.concurrent.Futures;
84
85/**
86 * Manages the inventory of links using a {@code EventuallyConsistentMap}.
87 */
Madan Jampani8856ee12015-08-21 09:24:27 -070088@Component(immediate = true, enabled = true)
Madan Jampanibad538c2015-08-19 17:35:27 -070089@Service
90public class ECLinkStore
91 extends AbstractStore<LinkEvent, LinkStoreDelegate>
92 implements LinkStore {
93
94 private final Logger log = getLogger(getClass());
95
96 private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
97 private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
98
99 private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected StorageService storageService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected MastershipService mastershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected DeviceClockService deviceClockService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
116 private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
117 new InternalLinkTracker();
118
119 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
120 @Override
121 protected void setupKryoPool() {
122 serializerPool = KryoNamespace.newBuilder()
123 .register(DistributedStoreSerializers.STORE_COMMON)
124 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
125 .register(Provided.class)
126 .build();
127 }
128 };
129
130 @Activate
131 public void activate() {
132 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
133 .register(KryoNamespaces.API)
134 .register(MastershipBasedTimestamp.class)
135 .register(Provided.class);
136
137 linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
138 .withName("onos-link-descriptions")
139 .withSerializer(serializer)
140 .withTimestampProvider((k, v) -> {
141 try {
142 return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
143 } catch (IllegalStateException e) {
144 return null;
145 }
146 }).build();
147
148 clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
149 SERIALIZER::decode,
150 this::injectLink,
Madan Jampanif8d572f2015-08-25 12:14:43 -0700151 SERIALIZER::encode,
Madan Jampanibad538c2015-08-19 17:35:27 -0700152 SharedExecutors.getPoolThreadExecutor());
153
154 linkDescriptions.addListener(linkTracker);
155
156 log.info("Started");
157 }
158
159 @Deactivate
160 public void deactivate() {
161 linkDescriptions.removeListener(linkTracker);
162 linkDescriptions.destroy();
163 links.clear();
164 clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
165
166 log.info("Stopped");
167 }
168
169 @Override
170 public int getLinkCount() {
171 return links.size();
172 }
173
174 @Override
175 public Iterable<Link> getLinks() {
176 return links.values();
177 }
178
179 @Override
180 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
181 return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
182 }
183
184 @Override
185 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
186 return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
187 }
188
189 @Override
190 public Link getLink(ConnectPoint src, ConnectPoint dst) {
191 return links.get(linkKey(src, dst));
192 }
193
194 @Override
195 public Set<Link> getEgressLinks(ConnectPoint src) {
196 return filter(links.values(), link -> src.equals(link.src()));
197 }
198
199 @Override
200 public Set<Link> getIngressLinks(ConnectPoint dst) {
201 return filter(links.values(), link -> dst.equals(link.dst()));
202 }
203
204 @Override
205 public LinkEvent createOrUpdateLink(ProviderId providerId,
206 LinkDescription linkDescription) {
207 final DeviceId dstDeviceId = linkDescription.dst().deviceId();
208 final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
209
210 // Process link update only if we're the master of the destination node,
211 // otherwise signal the actual master.
212 if (clusterService.getLocalNode().id().equals(dstNodeId)) {
213 LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
Ayaka Koshibe2c59acf2015-09-08 15:37:47 -0700214 Provided<LinkKey> internalLinkKey = getProvided(linkKey, providerId);
215 if (internalLinkKey == null) {
216 return null;
217 }
Jian Li68c4fc42016-01-11 16:07:03 -0800218 linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v, linkDescription));
Madan Jampanibad538c2015-08-19 17:35:27 -0700219 return refreshLinkCache(linkKey);
220 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800221 // Only forward for ConfigProvider
222 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800223 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800224 return null;
225 }
226 // Temporary hack for NPE (ONOS-1171).
227 // Proper fix is to implement forwarding to master on ConfigProvider
Madan Jampanibad538c2015-08-19 17:35:27 -0700228 if (dstNodeId == null) {
229 return null;
230 }
231 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
232 LINK_INJECT_MESSAGE,
233 SERIALIZER::encode,
234 SERIALIZER::decode,
235 dstNodeId));
236 }
237 }
238
Ayaka Koshibe2c59acf2015-09-08 15:37:47 -0700239 private Provided<LinkKey> getProvided(LinkKey linkKey, ProviderId provId) {
240 ProviderId bpid = getBaseProviderId(linkKey);
241 if (provId == null) {
242 // The LinkService didn't know who this LinkKey belongs to.
243 // A fix is to either modify the getProvider() in LinkService classes
244 // or expose the contents of linkDescriptions to the LinkService.
245 return (bpid == null) ? null : new Provided<>(linkKey, bpid);
246 } else {
247 return new Provided<>(linkKey, provId);
248 }
249 }
250
Madan Jampanibad538c2015-08-19 17:35:27 -0700251 private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
252 if (current != null) {
253 // we only allow transition from INDIRECT -> DIRECT
254 return new DefaultLinkDescription(
255 current.src(),
256 current.dst(),
257 current.type() == DIRECT ? DIRECT : updated.type(),
258 union(current.annotations(), updated.annotations()));
259 }
260 return updated;
261 }
262
263 private LinkEvent refreshLinkCache(LinkKey linkKey) {
264 AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
265 Link link = links.compute(linkKey, (key, existingLink) -> {
266 Link newLink = composeLink(linkKey);
267 if (existingLink == null) {
268 eventType.set(LINK_ADDED);
269 return newLink;
270 } else if (existingLink.state() != newLink.state() ||
271 (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
272 !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
273 eventType.set(LINK_UPDATED);
274 return newLink;
275 } else {
276 return existingLink;
277 }
278 });
279 return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
280 }
281
282 private Set<ProviderId> getAllProviders(LinkKey linkKey) {
283 return linkDescriptions.keySet()
284 .stream()
285 .filter(key -> key.key().equals(linkKey))
286 .map(key -> key.providerId())
287 .collect(Collectors.toSet());
288 }
289
290 private ProviderId getBaseProviderId(LinkKey linkKey) {
291 Set<ProviderId> allProviders = getAllProviders(linkKey);
292 if (allProviders.size() > 0) {
293 return allProviders.stream()
294 .filter(p -> !p.isAncillary())
295 .findFirst()
296 .orElse(Iterables.getFirst(allProviders, null));
297 }
298 return null;
299 }
300
301 private Link composeLink(LinkKey linkKey) {
302
303 ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey));
304 LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
305
306 ConnectPoint src = base.src();
307 ConnectPoint dst = base.dst();
308 Type type = base.type();
309 AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
310 annotations.set(merge(annotations.get(), base.annotations()));
311
312 getAllProviders(linkKey).stream()
313 .map(p -> new Provided<>(linkKey, p))
314 .forEach(key -> {
315 annotations.set(merge(annotations.get(),
316 linkDescriptions.get(key).annotations()));
317 });
318
319 boolean isDurable = Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
320 return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations.get());
321 }
322
323 // Updates, if necessary the specified link and returns the appropriate event.
324 // Guarded by linkDescs value (=locking each Link)
325 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
326 // Note: INDIRECT -> DIRECT transition only
327 // so that BDDP discovered Link will not overwrite LDDP Link
328 if (oldLink.state() != newLink.state() ||
329 (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
330 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
331
332 links.put(key, newLink);
333 return new LinkEvent(LINK_UPDATED, newLink);
334 }
335 return null;
336 }
337
338 @Override
339 public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
340 Link link = getLink(src, dst);
341 if (link == null) {
342 return null;
343 }
344
345 if (link.isDurable()) {
346 // FIXME: this will not sync link state!!!
347 return link.state() == INACTIVE ? null :
348 updateLink(linkKey(link.src(), link.dst()), link,
349 new DefaultLink(link.providerId(),
350 link.src(), link.dst(),
351 link.type(), INACTIVE,
352 link.isDurable(),
353 link.annotations()));
354 }
355 return removeLink(src, dst);
356 }
357
358 @Override
359 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
360 final LinkKey linkKey = LinkKey.linkKey(src, dst);
Madan Jampani5d4396f2015-09-02 16:04:20 -0700361 ProviderId primaryProviderId = getBaseProviderId(linkKey);
362 // Stop if there is no base provider.
363 if (primaryProviderId == null) {
364 return null;
365 }
Madan Jampanibad538c2015-08-19 17:35:27 -0700366 LinkDescription removedLinkDescription =
Madan Jampani5d4396f2015-09-02 16:04:20 -0700367 linkDescriptions.remove(new Provided<>(linkKey, primaryProviderId));
Madan Jampanibad538c2015-08-19 17:35:27 -0700368 if (removedLinkDescription != null) {
369 return purgeLinkCache(linkKey);
370 }
371 return null;
372 }
373
374 private LinkEvent purgeLinkCache(LinkKey linkKey) {
375 Link removedLink = links.remove(linkKey);
376 if (removedLink != null) {
377 getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
378 return new LinkEvent(LINK_REMOVED, removedLink);
379 }
380 return null;
381 }
382
383 private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
384 return links.stream().filter(predicate).collect(Collectors.toSet());
385 }
386
387 private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
388 log.trace("Received request to inject link {}", linkInjectRequest);
389
390 ProviderId providerId = linkInjectRequest.providerId();
391 LinkDescription linkDescription = linkInjectRequest.key();
392
393 final DeviceId deviceId = linkDescription.dst().deviceId();
394 if (!deviceClockService.isTimestampAvailable(deviceId)) {
395 // workaround for ONOS-1208
396 log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
397 return null;
398 }
399 return createOrUpdateLink(providerId, linkDescription);
400 }
401
402 private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
403 @Override
404 public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
405 if (event.type() == PUT) {
406 notifyDelegate(refreshLinkCache(event.key().key()));
407 } else if (event.type() == REMOVE) {
408 notifyDelegate(purgeLinkCache(event.key().key()));
409 }
410 }
411 }
Ray Milkeyd0dd1352016-01-19 10:58:41 -0800412}