blob: 678e242a55db126ba591a88e81814e1ed4bb7d8d [file] [log] [blame]
Madan Jampanibad538c2015-08-19 17:35:27 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.link.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static org.onosproject.net.DefaultAnnotations.merge;
20import static org.onosproject.net.DefaultAnnotations.union;
21import static org.onosproject.net.Link.State.ACTIVE;
22import static org.onosproject.net.Link.State.INACTIVE;
23import static org.onosproject.net.Link.Type.DIRECT;
24import static org.onosproject.net.Link.Type.INDIRECT;
25import static org.onosproject.net.LinkKey.linkKey;
26import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
27import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
28import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
29import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
30import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
31import static org.slf4j.LoggerFactory.getLogger;
32
33import java.util.Collection;
34import java.util.Map;
35import java.util.Objects;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicReference;
38import java.util.function.Predicate;
39import java.util.stream.Collectors;
40
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
44import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
46import org.apache.felix.scr.annotations.Service;
47import org.onlab.util.KryoNamespace;
48import org.onlab.util.SharedExecutors;
49import org.onosproject.cluster.ClusterService;
50import org.onosproject.cluster.NodeId;
51import org.onosproject.mastership.MastershipService;
52import org.onosproject.net.AnnotationKeys;
53import org.onosproject.net.AnnotationsUtil;
54import org.onosproject.net.ConnectPoint;
55import org.onosproject.net.DefaultAnnotations;
56import org.onosproject.net.DefaultLink;
57import org.onosproject.net.DeviceId;
58import org.onosproject.net.Link;
59import org.onosproject.net.LinkKey;
60import org.onosproject.net.Link.Type;
61import org.onosproject.net.device.DeviceClockService;
62import org.onosproject.net.link.DefaultLinkDescription;
63import org.onosproject.net.link.LinkDescription;
64import org.onosproject.net.link.LinkEvent;
65import org.onosproject.net.link.LinkStore;
66import org.onosproject.net.link.LinkStoreDelegate;
67import org.onosproject.net.provider.ProviderId;
68import org.onosproject.store.AbstractStore;
69import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
70import org.onosproject.store.cluster.messaging.MessageSubject;
71import org.onosproject.store.impl.MastershipBasedTimestamp;
72import org.onosproject.store.serializers.KryoNamespaces;
73import org.onosproject.store.serializers.KryoSerializer;
74import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75import org.onosproject.store.service.EventuallyConsistentMap;
76import org.onosproject.store.service.EventuallyConsistentMapEvent;
77import org.onosproject.store.service.EventuallyConsistentMapListener;
78import org.onosproject.store.service.StorageService;
79import org.slf4j.Logger;
80
81import com.google.common.collect.Iterables;
82import com.google.common.collect.Maps;
83import com.google.common.util.concurrent.Futures;
84
85/**
86 * Manages the inventory of links using a {@code EventuallyConsistentMap}.
87 */
Madan Jampani8856ee12015-08-21 09:24:27 -070088@Component(immediate = true, enabled = true)
Madan Jampanibad538c2015-08-19 17:35:27 -070089@Service
90public class ECLinkStore
91 extends AbstractStore<LinkEvent, LinkStoreDelegate>
92 implements LinkStore {
93
94 private final Logger log = getLogger(getClass());
95
96 private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
97 private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
98
99 private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 protected StorageService storageService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected MastershipService mastershipService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 protected DeviceClockService deviceClockService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 protected ClusterCommunicationService clusterCommunicator;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected ClusterService clusterService;
115
116 private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
117 new InternalLinkTracker();
118
119 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
120 @Override
121 protected void setupKryoPool() {
122 serializerPool = KryoNamespace.newBuilder()
123 .register(DistributedStoreSerializers.STORE_COMMON)
124 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
125 .register(Provided.class)
126 .build();
127 }
128 };
129
130 @Activate
131 public void activate() {
132 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
133 .register(KryoNamespaces.API)
134 .register(MastershipBasedTimestamp.class)
135 .register(Provided.class);
136
137 linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
138 .withName("onos-link-descriptions")
139 .withSerializer(serializer)
140 .withTimestampProvider((k, v) -> {
141 try {
142 return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
143 } catch (IllegalStateException e) {
144 return null;
145 }
146 }).build();
147
148 clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
149 SERIALIZER::decode,
150 this::injectLink,
Madan Jampanif8d572f2015-08-25 12:14:43 -0700151 SERIALIZER::encode,
Madan Jampanibad538c2015-08-19 17:35:27 -0700152 SharedExecutors.getPoolThreadExecutor());
153
154 linkDescriptions.addListener(linkTracker);
155
156 log.info("Started");
157 }
158
159 @Deactivate
160 public void deactivate() {
161 linkDescriptions.removeListener(linkTracker);
162 linkDescriptions.destroy();
163 links.clear();
164 clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
165
166 log.info("Stopped");
167 }
168
169 @Override
170 public int getLinkCount() {
171 return links.size();
172 }
173
174 @Override
175 public Iterable<Link> getLinks() {
176 return links.values();
177 }
178
179 @Override
180 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
181 return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
182 }
183
184 @Override
185 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
186 return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
187 }
188
189 @Override
190 public Link getLink(ConnectPoint src, ConnectPoint dst) {
191 return links.get(linkKey(src, dst));
192 }
193
194 @Override
195 public Set<Link> getEgressLinks(ConnectPoint src) {
196 return filter(links.values(), link -> src.equals(link.src()));
197 }
198
199 @Override
200 public Set<Link> getIngressLinks(ConnectPoint dst) {
201 return filter(links.values(), link -> dst.equals(link.dst()));
202 }
203
204 @Override
205 public LinkEvent createOrUpdateLink(ProviderId providerId,
206 LinkDescription linkDescription) {
207 final DeviceId dstDeviceId = linkDescription.dst().deviceId();
208 final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
209
210 // Process link update only if we're the master of the destination node,
211 // otherwise signal the actual master.
212 if (clusterService.getLocalNode().id().equals(dstNodeId)) {
213 LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
214 Provided<LinkKey> internalLinkKey = new Provided<>(linkKey, providerId);
215 linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v , linkDescription));
216 return refreshLinkCache(linkKey);
217 } else {
218 if (dstNodeId == null) {
219 return null;
220 }
221 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
222 LINK_INJECT_MESSAGE,
223 SERIALIZER::encode,
224 SERIALIZER::decode,
225 dstNodeId));
226 }
227 }
228
229 private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
230 if (current != null) {
231 // we only allow transition from INDIRECT -> DIRECT
232 return new DefaultLinkDescription(
233 current.src(),
234 current.dst(),
235 current.type() == DIRECT ? DIRECT : updated.type(),
236 union(current.annotations(), updated.annotations()));
237 }
238 return updated;
239 }
240
241 private LinkEvent refreshLinkCache(LinkKey linkKey) {
242 AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
243 Link link = links.compute(linkKey, (key, existingLink) -> {
244 Link newLink = composeLink(linkKey);
245 if (existingLink == null) {
246 eventType.set(LINK_ADDED);
247 return newLink;
248 } else if (existingLink.state() != newLink.state() ||
249 (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
250 !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
251 eventType.set(LINK_UPDATED);
252 return newLink;
253 } else {
254 return existingLink;
255 }
256 });
257 return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
258 }
259
260 private Set<ProviderId> getAllProviders(LinkKey linkKey) {
261 return linkDescriptions.keySet()
262 .stream()
263 .filter(key -> key.key().equals(linkKey))
264 .map(key -> key.providerId())
265 .collect(Collectors.toSet());
266 }
267
268 private ProviderId getBaseProviderId(LinkKey linkKey) {
269 Set<ProviderId> allProviders = getAllProviders(linkKey);
270 if (allProviders.size() > 0) {
271 return allProviders.stream()
272 .filter(p -> !p.isAncillary())
273 .findFirst()
274 .orElse(Iterables.getFirst(allProviders, null));
275 }
276 return null;
277 }
278
279 private Link composeLink(LinkKey linkKey) {
280
281 ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey));
282 LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
283
284 ConnectPoint src = base.src();
285 ConnectPoint dst = base.dst();
286 Type type = base.type();
287 AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
288 annotations.set(merge(annotations.get(), base.annotations()));
289
290 getAllProviders(linkKey).stream()
291 .map(p -> new Provided<>(linkKey, p))
292 .forEach(key -> {
293 annotations.set(merge(annotations.get(),
294 linkDescriptions.get(key).annotations()));
295 });
296
297 boolean isDurable = Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
298 return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations.get());
299 }
300
301 // Updates, if necessary the specified link and returns the appropriate event.
302 // Guarded by linkDescs value (=locking each Link)
303 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
304 // Note: INDIRECT -> DIRECT transition only
305 // so that BDDP discovered Link will not overwrite LDDP Link
306 if (oldLink.state() != newLink.state() ||
307 (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
308 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
309
310 links.put(key, newLink);
311 return new LinkEvent(LINK_UPDATED, newLink);
312 }
313 return null;
314 }
315
316 @Override
317 public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
318 Link link = getLink(src, dst);
319 if (link == null) {
320 return null;
321 }
322
323 if (link.isDurable()) {
324 // FIXME: this will not sync link state!!!
325 return link.state() == INACTIVE ? null :
326 updateLink(linkKey(link.src(), link.dst()), link,
327 new DefaultLink(link.providerId(),
328 link.src(), link.dst(),
329 link.type(), INACTIVE,
330 link.isDurable(),
331 link.annotations()));
332 }
333 return removeLink(src, dst);
334 }
335
336 @Override
337 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
338 final LinkKey linkKey = LinkKey.linkKey(src, dst);
339 LinkDescription removedLinkDescription =
340 linkDescriptions.remove(new Provided<>(linkKey, checkNotNull(getBaseProviderId(linkKey))));
341 if (removedLinkDescription != null) {
342 return purgeLinkCache(linkKey);
343 }
344 return null;
345 }
346
347 private LinkEvent purgeLinkCache(LinkKey linkKey) {
348 Link removedLink = links.remove(linkKey);
349 if (removedLink != null) {
350 getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
351 return new LinkEvent(LINK_REMOVED, removedLink);
352 }
353 return null;
354 }
355
356 private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
357 return links.stream().filter(predicate).collect(Collectors.toSet());
358 }
359
360 private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
361 log.trace("Received request to inject link {}", linkInjectRequest);
362
363 ProviderId providerId = linkInjectRequest.providerId();
364 LinkDescription linkDescription = linkInjectRequest.key();
365
366 final DeviceId deviceId = linkDescription.dst().deviceId();
367 if (!deviceClockService.isTimestampAvailable(deviceId)) {
368 // workaround for ONOS-1208
369 log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
370 return null;
371 }
372 return createOrUpdateLink(providerId, linkDescription);
373 }
374
375 private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
376 @Override
377 public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
378 if (event.type() == PUT) {
379 notifyDelegate(refreshLinkCache(event.key().key()));
380 } else if (event.type() == REMOVE) {
381 notifyDelegate(purgeLinkCache(event.key().key()));
382 }
383 }
384 }
385}