blob: 4f56f4011a538b137d0188c0fa152e78381e090a [file] [log] [blame]
Madan Jampanibad538c2015-08-19 17:35:27 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Madan Jampanibad538c2015-08-19 17:35:27 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.link.impl;
17
Madan Jampanibad538c2015-08-19 17:35:27 -070018import java.util.Collection;
19import java.util.Map;
20import java.util.Objects;
21import java.util.Set;
22import java.util.concurrent.atomic.AtomicReference;
23import java.util.function.Predicate;
24import java.util.stream.Collectors;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.SharedExecutors;
34import org.onosproject.cluster.ClusterService;
35import org.onosproject.cluster.NodeId;
Ray Milkeyb7f0f642016-01-22 16:08:14 -080036import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
Madan Jampanibad538c2015-08-19 17:35:27 -070038import org.onosproject.mastership.MastershipService;
39import org.onosproject.net.AnnotationKeys;
40import org.onosproject.net.AnnotationsUtil;
41import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DefaultAnnotations;
43import org.onosproject.net.DefaultLink;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.Link;
Madan Jampanibad538c2015-08-19 17:35:27 -070046import org.onosproject.net.Link.Type;
Ray Milkeyb7f0f642016-01-22 16:08:14 -080047import org.onosproject.net.LinkKey;
48import org.onosproject.net.config.ConfigFactory;
49import org.onosproject.net.config.NetworkConfigEvent;
50import org.onosproject.net.config.NetworkConfigListener;
51import org.onosproject.net.config.NetworkConfigRegistry;
Madan Jampanibad538c2015-08-19 17:35:27 -070052import org.onosproject.net.device.DeviceClockService;
53import org.onosproject.net.link.DefaultLinkDescription;
54import org.onosproject.net.link.LinkDescription;
55import org.onosproject.net.link.LinkEvent;
56import org.onosproject.net.link.LinkStore;
57import org.onosproject.net.link.LinkStoreDelegate;
58import org.onosproject.net.provider.ProviderId;
59import org.onosproject.store.AbstractStore;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.MessageSubject;
62import org.onosproject.store.impl.MastershipBasedTimestamp;
63import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.serializers.KryoSerializer;
65import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
66import org.onosproject.store.service.EventuallyConsistentMap;
67import org.onosproject.store.service.EventuallyConsistentMapEvent;
68import org.onosproject.store.service.EventuallyConsistentMapListener;
69import org.onosproject.store.service.StorageService;
70import org.slf4j.Logger;
71
72import com.google.common.collect.Iterables;
73import com.google.common.collect.Maps;
74import com.google.common.util.concurrent.Futures;
75
Ray Milkeyb7f0f642016-01-22 16:08:14 -080076import static com.google.common.base.Preconditions.checkNotNull;
77import static org.onosproject.net.DefaultAnnotations.merge;
78import static org.onosproject.net.DefaultAnnotations.union;
79import static org.onosproject.net.Link.State.ACTIVE;
80import static org.onosproject.net.Link.State.INACTIVE;
81import static org.onosproject.net.Link.Type.DIRECT;
82import static org.onosproject.net.Link.Type.INDIRECT;
83import static org.onosproject.net.LinkKey.linkKey;
84import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
85import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
86import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
87import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
88import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
89import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
90import static org.slf4j.LoggerFactory.getLogger;
91
Madan Jampanibad538c2015-08-19 17:35:27 -070092/**
93 * Manages the inventory of links using a {@code EventuallyConsistentMap}.
94 */
Madan Jampani8856ee12015-08-21 09:24:27 -070095@Component(immediate = true, enabled = true)
Madan Jampanibad538c2015-08-19 17:35:27 -070096@Service
97public class ECLinkStore
98 extends AbstractStore<LinkEvent, LinkStoreDelegate>
99 implements LinkStore {
100
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800101 /**
102 * Modes for dealing with newly discovered links.
103 */
104 protected enum LinkDiscoveryMode {
105 /**
106 * Permissive mode - all newly discovered links are valid.
107 */
108 PERMISSIVE,
109
110 /**
111 * Strict mode - all newly discovered links must be defined in
112 * the network config.
113 */
114 STRICT
115 }
116
Madan Jampanibad538c2015-08-19 17:35:27 -0700117 private final Logger log = getLogger(getClass());
118
119 private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
120 private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
121
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800122 private ApplicationId appId;
123
Madan Jampanibad538c2015-08-19 17:35:27 -0700124 private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected StorageService storageService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected MastershipService mastershipService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected DeviceClockService deviceClockService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected ClusterCommunicationService clusterCommunicator;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected ClusterService clusterService;
140
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected NetworkConfigRegistry netCfgService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
145 protected CoreService coreService;
146
Madan Jampanibad538c2015-08-19 17:35:27 -0700147 private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
148 new InternalLinkTracker();
149
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800150 // Listener for config changes
151 private final InternalConfigListener cfgListener = new InternalConfigListener();
152
153 protected LinkDiscoveryMode linkDiscoveryMode = LinkDiscoveryMode.STRICT;
154
Madan Jampanibad538c2015-08-19 17:35:27 -0700155 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
156 @Override
157 protected void setupKryoPool() {
158 serializerPool = KryoNamespace.newBuilder()
159 .register(DistributedStoreSerializers.STORE_COMMON)
160 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
161 .register(Provided.class)
162 .build();
163 }
164 };
165
166 @Activate
167 public void activate() {
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800168 appId = coreService.registerApplication("org.onosproject.core");
169 netCfgService.registerConfigFactory(factory);
170 netCfgService.addListener(cfgListener);
171
172 cfgListener.reconfigure(netCfgService.getConfig(appId, CoreConfig.class));
173
Madan Jampanibad538c2015-08-19 17:35:27 -0700174 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
175 .register(KryoNamespaces.API)
176 .register(MastershipBasedTimestamp.class)
177 .register(Provided.class);
178
179 linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
180 .withName("onos-link-descriptions")
181 .withSerializer(serializer)
182 .withTimestampProvider((k, v) -> {
183 try {
184 return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
185 } catch (IllegalStateException e) {
186 return null;
187 }
188 }).build();
189
190 clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
191 SERIALIZER::decode,
192 this::injectLink,
Madan Jampanif8d572f2015-08-25 12:14:43 -0700193 SERIALIZER::encode,
Madan Jampanibad538c2015-08-19 17:35:27 -0700194 SharedExecutors.getPoolThreadExecutor());
195
196 linkDescriptions.addListener(linkTracker);
197
198 log.info("Started");
199 }
200
201 @Deactivate
202 public void deactivate() {
203 linkDescriptions.removeListener(linkTracker);
204 linkDescriptions.destroy();
205 links.clear();
206 clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800207 netCfgService.removeListener(cfgListener);
208 netCfgService.unregisterConfigFactory(factory);
Madan Jampanibad538c2015-08-19 17:35:27 -0700209
210 log.info("Stopped");
211 }
212
213 @Override
214 public int getLinkCount() {
215 return links.size();
216 }
217
218 @Override
219 public Iterable<Link> getLinks() {
220 return links.values();
221 }
222
223 @Override
224 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
225 return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
226 }
227
228 @Override
229 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
230 return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
231 }
232
233 @Override
234 public Link getLink(ConnectPoint src, ConnectPoint dst) {
235 return links.get(linkKey(src, dst));
236 }
237
238 @Override
239 public Set<Link> getEgressLinks(ConnectPoint src) {
240 return filter(links.values(), link -> src.equals(link.src()));
241 }
242
243 @Override
244 public Set<Link> getIngressLinks(ConnectPoint dst) {
245 return filter(links.values(), link -> dst.equals(link.dst()));
246 }
247
248 @Override
249 public LinkEvent createOrUpdateLink(ProviderId providerId,
250 LinkDescription linkDescription) {
251 final DeviceId dstDeviceId = linkDescription.dst().deviceId();
252 final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
253
254 // Process link update only if we're the master of the destination node,
255 // otherwise signal the actual master.
256 if (clusterService.getLocalNode().id().equals(dstNodeId)) {
257 LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
Ayaka Koshibe2c59acf2015-09-08 15:37:47 -0700258 Provided<LinkKey> internalLinkKey = getProvided(linkKey, providerId);
259 if (internalLinkKey == null) {
260 return null;
261 }
Jian Li68c4fc42016-01-11 16:07:03 -0800262 linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v, linkDescription));
Madan Jampanibad538c2015-08-19 17:35:27 -0700263 return refreshLinkCache(linkKey);
264 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800265 // Only forward for ConfigProvider
266 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800267 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800268 return null;
269 }
270 // Temporary hack for NPE (ONOS-1171).
271 // Proper fix is to implement forwarding to master on ConfigProvider
Madan Jampanibad538c2015-08-19 17:35:27 -0700272 if (dstNodeId == null) {
273 return null;
274 }
275 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
276 LINK_INJECT_MESSAGE,
277 SERIALIZER::encode,
278 SERIALIZER::decode,
279 dstNodeId));
280 }
281 }
282
Ayaka Koshibe2c59acf2015-09-08 15:37:47 -0700283 private Provided<LinkKey> getProvided(LinkKey linkKey, ProviderId provId) {
284 ProviderId bpid = getBaseProviderId(linkKey);
285 if (provId == null) {
286 // The LinkService didn't know who this LinkKey belongs to.
287 // A fix is to either modify the getProvider() in LinkService classes
288 // or expose the contents of linkDescriptions to the LinkService.
289 return (bpid == null) ? null : new Provided<>(linkKey, bpid);
290 } else {
291 return new Provided<>(linkKey, provId);
292 }
293 }
294
Madan Jampanibad538c2015-08-19 17:35:27 -0700295 private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
296 if (current != null) {
297 // we only allow transition from INDIRECT -> DIRECT
298 return new DefaultLinkDescription(
299 current.src(),
300 current.dst(),
301 current.type() == DIRECT ? DIRECT : updated.type(),
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800302 current.isExpected(),
Madan Jampanibad538c2015-08-19 17:35:27 -0700303 union(current.annotations(), updated.annotations()));
304 }
305 return updated;
306 }
307
308 private LinkEvent refreshLinkCache(LinkKey linkKey) {
309 AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
310 Link link = links.compute(linkKey, (key, existingLink) -> {
311 Link newLink = composeLink(linkKey);
312 if (existingLink == null) {
313 eventType.set(LINK_ADDED);
314 return newLink;
315 } else if (existingLink.state() != newLink.state() ||
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800316 existingLink.isExpected() != newLink.isExpected() ||
Madan Jampanibad538c2015-08-19 17:35:27 -0700317 (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
318 !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
319 eventType.set(LINK_UPDATED);
320 return newLink;
321 } else {
322 return existingLink;
323 }
324 });
325 return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
326 }
327
328 private Set<ProviderId> getAllProviders(LinkKey linkKey) {
329 return linkDescriptions.keySet()
330 .stream()
331 .filter(key -> key.key().equals(linkKey))
332 .map(key -> key.providerId())
333 .collect(Collectors.toSet());
334 }
335
336 private ProviderId getBaseProviderId(LinkKey linkKey) {
337 Set<ProviderId> allProviders = getAllProviders(linkKey);
338 if (allProviders.size() > 0) {
339 return allProviders.stream()
340 .filter(p -> !p.isAncillary())
341 .findFirst()
342 .orElse(Iterables.getFirst(allProviders, null));
343 }
344 return null;
345 }
346
347 private Link composeLink(LinkKey linkKey) {
348
349 ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey));
350 LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
351
352 ConnectPoint src = base.src();
353 ConnectPoint dst = base.dst();
354 Type type = base.type();
355 AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
356 annotations.set(merge(annotations.get(), base.annotations()));
357
358 getAllProviders(linkKey).stream()
359 .map(p -> new Provided<>(linkKey, p))
360 .forEach(key -> {
361 annotations.set(merge(annotations.get(),
362 linkDescriptions.get(key).annotations()));
363 });
364
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800365 Link.State initialLinkState;
366
367 boolean isExpected;
368 if (linkDiscoveryMode == LinkDiscoveryMode.PERMISSIVE) {
369 initialLinkState = ACTIVE;
370 isExpected =
371 Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
372 } else {
373 initialLinkState = base.isExpected() ? ACTIVE : INACTIVE;
374 isExpected = base.isExpected();
375 }
376
377
378
379
Ray Milkey2693bda2016-01-22 16:08:14 -0800380 return DefaultLink.builder()
381 .providerId(baseProviderId)
382 .src(src)
383 .dst(dst)
384 .type(type)
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800385 .state(initialLinkState)
386 .isExpected(isExpected)
Ray Milkey2693bda2016-01-22 16:08:14 -0800387 .annotations(annotations.get())
388 .build();
Madan Jampanibad538c2015-08-19 17:35:27 -0700389 }
390
391 // Updates, if necessary the specified link and returns the appropriate event.
392 // Guarded by linkDescs value (=locking each Link)
393 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
394 // Note: INDIRECT -> DIRECT transition only
395 // so that BDDP discovered Link will not overwrite LDDP Link
396 if (oldLink.state() != newLink.state() ||
397 (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
398 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
399
400 links.put(key, newLink);
401 return new LinkEvent(LINK_UPDATED, newLink);
402 }
403 return null;
404 }
405
406 @Override
407 public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
408 Link link = getLink(src, dst);
409 if (link == null) {
410 return null;
411 }
412
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800413 if (linkDiscoveryMode == LinkDiscoveryMode.PERMISSIVE && link.isExpected()) {
Madan Jampanibad538c2015-08-19 17:35:27 -0700414 // FIXME: this will not sync link state!!!
415 return link.state() == INACTIVE ? null :
416 updateLink(linkKey(link.src(), link.dst()), link,
Ray Milkey2693bda2016-01-22 16:08:14 -0800417 DefaultLink.builder()
418 .providerId(link.providerId())
419 .src(link.src())
420 .dst(link.dst())
421 .type(link.type())
422 .state(INACTIVE)
423 .isExpected(link.isExpected())
424 .annotations(link.annotations())
425 .build());
Madan Jampanibad538c2015-08-19 17:35:27 -0700426 }
427 return removeLink(src, dst);
428 }
429
430 @Override
431 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
432 final LinkKey linkKey = LinkKey.linkKey(src, dst);
Madan Jampani5d4396f2015-09-02 16:04:20 -0700433 ProviderId primaryProviderId = getBaseProviderId(linkKey);
434 // Stop if there is no base provider.
435 if (primaryProviderId == null) {
436 return null;
437 }
Madan Jampanibad538c2015-08-19 17:35:27 -0700438 LinkDescription removedLinkDescription =
Madan Jampani5d4396f2015-09-02 16:04:20 -0700439 linkDescriptions.remove(new Provided<>(linkKey, primaryProviderId));
Madan Jampanibad538c2015-08-19 17:35:27 -0700440 if (removedLinkDescription != null) {
441 return purgeLinkCache(linkKey);
442 }
443 return null;
444 }
445
446 private LinkEvent purgeLinkCache(LinkKey linkKey) {
447 Link removedLink = links.remove(linkKey);
448 if (removedLink != null) {
449 getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
450 return new LinkEvent(LINK_REMOVED, removedLink);
451 }
452 return null;
453 }
454
455 private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
456 return links.stream().filter(predicate).collect(Collectors.toSet());
457 }
458
459 private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
460 log.trace("Received request to inject link {}", linkInjectRequest);
461
462 ProviderId providerId = linkInjectRequest.providerId();
463 LinkDescription linkDescription = linkInjectRequest.key();
464
465 final DeviceId deviceId = linkDescription.dst().deviceId();
466 if (!deviceClockService.isTimestampAvailable(deviceId)) {
467 // workaround for ONOS-1208
468 log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
469 return null;
470 }
471 return createOrUpdateLink(providerId, linkDescription);
472 }
473
474 private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
475 @Override
476 public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
477 if (event.type() == PUT) {
478 notifyDelegate(refreshLinkCache(event.key().key()));
479 } else if (event.type() == REMOVE) {
480 notifyDelegate(purgeLinkCache(event.key().key()));
481 }
482 }
483 }
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800484
485 private class InternalConfigListener implements NetworkConfigListener {
486
487 void reconfigure(CoreConfig coreConfig) {
488 if (coreConfig == null) {
489 linkDiscoveryMode = LinkDiscoveryMode.PERMISSIVE;
490 } else {
491 linkDiscoveryMode = coreConfig.linkDiscoveryMode();
492 }
493 if (linkDiscoveryMode == LinkDiscoveryMode.STRICT) {
494 // Remove any previous links to force them to go through the strict
495 // discovery process
Ray Milkey15551272016-02-10 16:22:02 -0800496 if (linkDescriptions != null) {
497 linkDescriptions.clear();
498 }
499 if (links != null) {
500 links.clear();
501 }
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800502 }
503 log.debug("config set link discovery mode to {}",
504 linkDiscoveryMode.name());
505 }
506
507 @Override
508 public void event(NetworkConfigEvent event) {
509
510 if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
511 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
512 event.configClass().equals(CoreConfig.class)) {
513
514 CoreConfig cfg = netCfgService.getConfig(appId, CoreConfig.class);
515 reconfigure(cfg);
516 log.info("Reconfigured");
517 }
518 }
519 }
520
521 // Configuration properties factory
522 private final ConfigFactory factory =
523 new ConfigFactory<ApplicationId, CoreConfig>(APP_SUBJECT_FACTORY,
524 CoreConfig.class,
525 "core") {
526 @Override
527 public CoreConfig createConfig() {
528 return new CoreConfig();
529 }
530 };
Ray Milkeyd0dd1352016-01-19 10:58:41 -0800531}