blob: 2e15a8f96c72b35830143218246e4e81ddabaa3f [file] [log] [blame]
Madan Jampani2ff05592014-10-10 15:42:47 -07001package org.onlab.onos.store.link.impl;
2
3import com.google.common.base.Function;
4import com.google.common.base.Predicate;
5import com.google.common.collect.FluentIterable;
6import com.google.common.collect.HashMultimap;
Madan Jampania97e8202014-10-10 17:01:33 -07007import com.google.common.collect.ImmutableList;
Madan Jampani2ff05592014-10-10 15:42:47 -07008import com.google.common.collect.Maps;
9import com.google.common.collect.SetMultimap;
10
Madan Jampania97e8202014-10-10 17:01:33 -070011import org.apache.commons.lang3.RandomUtils;
Madan Jampani2ff05592014-10-10 15:42:47 -070012import org.apache.commons.lang3.concurrent.ConcurrentUtils;
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
16import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
18import org.apache.felix.scr.annotations.Service;
19import org.onlab.onos.cluster.ClusterService;
Madan Jampania97e8202014-10-10 17:01:33 -070020import org.onlab.onos.cluster.ControllerNode;
21import org.onlab.onos.cluster.NodeId;
Madan Jampani2ff05592014-10-10 15:42:47 -070022import org.onlab.onos.net.AnnotationsUtil;
23import org.onlab.onos.net.ConnectPoint;
24import org.onlab.onos.net.DefaultAnnotations;
25import org.onlab.onos.net.DefaultLink;
26import org.onlab.onos.net.DeviceId;
27import org.onlab.onos.net.Link;
28import org.onlab.onos.net.SparseAnnotations;
29import org.onlab.onos.net.Link.Type;
30import org.onlab.onos.net.LinkKey;
31import org.onlab.onos.net.Provided;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070032import org.onlab.onos.net.device.DeviceClockService;
Madan Jampani2ff05592014-10-10 15:42:47 -070033import org.onlab.onos.net.link.DefaultLinkDescription;
34import org.onlab.onos.net.link.LinkDescription;
35import org.onlab.onos.net.link.LinkEvent;
36import org.onlab.onos.net.link.LinkStore;
37import org.onlab.onos.net.link.LinkStoreDelegate;
38import org.onlab.onos.net.provider.ProviderId;
39import org.onlab.onos.store.AbstractStore;
Madan Jampani2ff05592014-10-10 15:42:47 -070040import org.onlab.onos.store.Timestamp;
41import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
42import org.onlab.onos.store.cluster.messaging.ClusterMessage;
43import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
44import org.onlab.onos.store.cluster.messaging.MessageSubject;
45import org.onlab.onos.store.common.impl.Timestamped;
46import org.onlab.onos.store.serializers.DistributedStoreSerializers;
47import org.onlab.onos.store.serializers.KryoSerializer;
48import org.onlab.util.KryoPool;
49import org.onlab.util.NewConcurrentHashMap;
50import org.slf4j.Logger;
51
52import java.io.IOException;
53import java.util.Collections;
Madan Jampania97e8202014-10-10 17:01:33 -070054import java.util.HashMap;
Madan Jampani2ff05592014-10-10 15:42:47 -070055import java.util.HashSet;
56import java.util.Map;
57import java.util.Set;
58import java.util.Map.Entry;
59import java.util.concurrent.ConcurrentHashMap;
60import java.util.concurrent.ConcurrentMap;
Madan Jampania97e8202014-10-10 17:01:33 -070061import java.util.concurrent.ScheduledExecutorService;
62import java.util.concurrent.TimeUnit;
Madan Jampani2ff05592014-10-10 15:42:47 -070063
Madan Jampania97e8202014-10-10 17:01:33 -070064import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
65import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Madan Jampani2ff05592014-10-10 15:42:47 -070066import static org.onlab.onos.net.DefaultAnnotations.union;
67import static org.onlab.onos.net.DefaultAnnotations.merge;
68import static org.onlab.onos.net.Link.Type.DIRECT;
69import static org.onlab.onos.net.Link.Type.INDIRECT;
70import static org.onlab.onos.net.link.LinkEvent.Type.*;
Madan Jampania97e8202014-10-10 17:01:33 -070071import static org.onlab.util.Tools.namedThreads;
Madan Jampani2ff05592014-10-10 15:42:47 -070072import static org.slf4j.LoggerFactory.getLogger;
73import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
74import static com.google.common.base.Predicates.notNull;
75
76/**
77 * Manages inventory of infrastructure links in distributed data store
78 * that uses optimistic replication and gossip based techniques.
79 */
80@Component(immediate = true)
81@Service
82public class GossipLinkStore
83 extends AbstractStore<LinkEvent, LinkStoreDelegate>
84 implements LinkStore {
85
86 private final Logger log = getLogger(getClass());
87
88 // Link inventory
89 private final ConcurrentMap<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>> linkDescs =
90 new ConcurrentHashMap<>();
91
92 // Link instance cache
93 private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
94
95 // Egress and ingress link sets
96 private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
97 private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
98
99 // Remove links
100 private final Map<LinkKey, Timestamp> removedLinks = Maps.newHashMap();
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700103 protected DeviceClockService deviceClockService;
Madan Jampani2ff05592014-10-10 15:42:47 -0700104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 protected ClusterCommunicationService clusterCommunicator;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ClusterService clusterService;
110
111 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
112 @Override
113 protected void setupKryoPool() {
114 serializerPool = KryoPool.newBuilder()
115 .register(DistributedStoreSerializers.COMMON)
116 .register(InternalLinkEvent.class)
117 .register(InternalLinkRemovedEvent.class)
Madan Jampanid2054d42014-10-10 17:27:06 -0700118 .register(LinkAntiEntropyAdvertisement.class)
119 .register(LinkFragmentId.class)
Madan Jampani2ff05592014-10-10 15:42:47 -0700120 .build()
121 .populate(1);
122 }
123 };
124
Madan Jampania97e8202014-10-10 17:01:33 -0700125 private ScheduledExecutorService executor;
126
Madan Jampani2ff05592014-10-10 15:42:47 -0700127 @Activate
128 public void activate() {
129
130 clusterCommunicator.addSubscriber(
Madan Jampania97e8202014-10-10 17:01:33 -0700131 GossipLinkStoreMessageSubjects.LINK_UPDATE,
132 new InternalLinkEventListener());
Madan Jampani2ff05592014-10-10 15:42:47 -0700133 clusterCommunicator.addSubscriber(
Madan Jampania97e8202014-10-10 17:01:33 -0700134 GossipLinkStoreMessageSubjects.LINK_REMOVED,
135 new InternalLinkRemovedEventListener());
136 clusterCommunicator.addSubscriber(
137 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
138 new InternalLinkAntiEntropyAdvertisementListener());
139
140 executor =
141 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
142
143 // TODO: Make these configurable
144 long initialDelaySec = 5;
145 long periodSec = 5;
146 // start anti-entropy thread
147 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
148 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani2ff05592014-10-10 15:42:47 -0700149
150 log.info("Started");
151 }
152
153 @Deactivate
154 public void deactivate() {
Madan Jampani3ffbb272014-10-13 11:19:37 -0700155
156 executor.shutdownNow();
157 try {
158 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
159 log.error("Timeout during executor shutdown");
160 }
161 } catch (InterruptedException e) {
162 log.error("Error during executor shutdown", e);
163 }
164
Madan Jampani2ff05592014-10-10 15:42:47 -0700165 linkDescs.clear();
166 links.clear();
167 srcLinks.clear();
168 dstLinks.clear();
169 log.info("Stopped");
170 }
171
172 @Override
173 public int getLinkCount() {
174 return links.size();
175 }
176
177 @Override
178 public Iterable<Link> getLinks() {
179 return Collections.unmodifiableCollection(links.values());
180 }
181
182 @Override
183 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
184 // lock for iteration
185 synchronized (srcLinks) {
186 return FluentIterable.from(srcLinks.get(deviceId))
187 .transform(lookupLink())
188 .filter(notNull())
189 .toSet();
190 }
191 }
192
193 @Override
194 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
195 // lock for iteration
196 synchronized (dstLinks) {
197 return FluentIterable.from(dstLinks.get(deviceId))
198 .transform(lookupLink())
199 .filter(notNull())
200 .toSet();
201 }
202 }
203
204 @Override
205 public Link getLink(ConnectPoint src, ConnectPoint dst) {
206 return links.get(new LinkKey(src, dst));
207 }
208
209 @Override
210 public Set<Link> getEgressLinks(ConnectPoint src) {
211 Set<Link> egress = new HashSet<>();
212 for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
213 if (linkKey.src().equals(src)) {
214 egress.add(links.get(linkKey));
215 }
216 }
217 return egress;
218 }
219
220 @Override
221 public Set<Link> getIngressLinks(ConnectPoint dst) {
222 Set<Link> ingress = new HashSet<>();
223 for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
224 if (linkKey.dst().equals(dst)) {
225 ingress.add(links.get(linkKey));
226 }
227 }
228 return ingress;
229 }
230
231 @Override
232 public LinkEvent createOrUpdateLink(ProviderId providerId,
233 LinkDescription linkDescription) {
234
235 DeviceId dstDeviceId = linkDescription.dst().deviceId();
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700236 Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700237
238 final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
239
240 LinkEvent event = createOrUpdateLinkInternal(providerId, deltaDesc);
241
242 if (event != null) {
243 log.info("Notifying peers of a link update topology event from providerId: "
244 + "{} between src: {} and dst: {}",
245 providerId, linkDescription.src(), linkDescription.dst());
246 try {
247 notifyPeers(new InternalLinkEvent(providerId, deltaDesc));
248 } catch (IOException e) {
249 log.info("Failed to notify peers of a link update topology event from providerId: "
250 + "{} between src: {} and dst: {}",
251 providerId, linkDescription.src(), linkDescription.dst());
252 }
253 }
254 return event;
255 }
256
257 private LinkEvent createOrUpdateLinkInternal(
258 ProviderId providerId,
259 Timestamped<LinkDescription> linkDescription) {
260
261 LinkKey key = new LinkKey(linkDescription.value().src(), linkDescription.value().dst());
262 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
263
264 synchronized (descs) {
265 // if the link was previously removed, we should proceed if and
266 // only if this request is more recent.
267 Timestamp linkRemovedTimestamp = removedLinks.get(key);
268 if (linkRemovedTimestamp != null) {
269 if (linkDescription.isNewer(linkRemovedTimestamp)) {
270 removedLinks.remove(key);
271 } else {
272 return null;
273 }
274 }
275
276 final Link oldLink = links.get(key);
277 // update description
278 createOrUpdateLinkDescription(descs, providerId, linkDescription);
279 final Link newLink = composeLink(descs);
280 if (oldLink == null) {
281 return createLink(key, newLink);
282 }
283 return updateLink(key, oldLink, newLink);
284 }
285 }
286
287 // Guarded by linkDescs value (=locking each Link)
288 private Timestamped<LinkDescription> createOrUpdateLinkDescription(
289 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> existingLinkDescriptions,
290 ProviderId providerId,
291 Timestamped<LinkDescription> linkDescription) {
292
293 // merge existing attributes and merge
294 Timestamped<LinkDescription> existingLinkDescription = existingLinkDescriptions.get(providerId);
295 if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
296 return null;
297 }
298 Timestamped<LinkDescription> newLinkDescription = linkDescription;
299 if (existingLinkDescription != null) {
300 SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
301 linkDescription.value().annotations());
302 newLinkDescription = new Timestamped<LinkDescription>(
303 new DefaultLinkDescription(
304 linkDescription.value().src(),
305 linkDescription.value().dst(),
306 linkDescription.value().type(), merged),
307 linkDescription.timestamp());
308 }
309 return existingLinkDescriptions.put(providerId, newLinkDescription);
310 }
311
312 // Creates and stores the link and returns the appropriate event.
313 // Guarded by linkDescs value (=locking each Link)
314 private LinkEvent createLink(LinkKey key, Link newLink) {
315
316 if (newLink.providerId().isAncillary()) {
317 // TODO: revisit ancillary only Link handling
318
319 // currently treating ancillary only as down (not visible outside)
320 return null;
321 }
322
323 links.put(key, newLink);
324 srcLinks.put(newLink.src().deviceId(), key);
325 dstLinks.put(newLink.dst().deviceId(), key);
326 return new LinkEvent(LINK_ADDED, newLink);
327 }
328
329 // Updates, if necessary the specified link and returns the appropriate event.
330 // Guarded by linkDescs value (=locking each Link)
331 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
332
333 if (newLink.providerId().isAncillary()) {
334 // TODO: revisit ancillary only Link handling
335
336 // currently treating ancillary only as down (not visible outside)
337 return null;
338 }
339
340 if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
341 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
342
343 links.put(key, newLink);
344 // strictly speaking following can be ommitted
345 srcLinks.put(oldLink.src().deviceId(), key);
346 dstLinks.put(oldLink.dst().deviceId(), key);
347 return new LinkEvent(LINK_UPDATED, newLink);
348 }
349 return null;
350 }
351
352 @Override
353 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
354 final LinkKey key = new LinkKey(src, dst);
355
356 DeviceId dstDeviceId = dst.deviceId();
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700357 Timestamp timestamp = deviceClockService.getTimestamp(dstDeviceId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700358
359 LinkEvent event = removeLinkInternal(key, timestamp);
360
361 if (event != null) {
362 log.info("Notifying peers of a link removed topology event for a link "
363 + "between src: {} and dst: {}", src, dst);
364 try {
365 notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
366 } catch (IOException e) {
367 log.error("Failed to notify peers of a link removed topology event for a link "
368 + "between src: {} and dst: {}", src, dst);
369 }
370 }
371 return event;
372 }
373
374 private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
375 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
376 getLinkDescriptions(key);
377 synchronized (linkDescriptions) {
378 // accept removal request if given timestamp is newer than
379 // the latest Timestamp from Primary provider
380 ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
381 if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
382 return null;
383 }
384 removedLinks.put(key, timestamp);
385 Link link = links.remove(key);
386 linkDescriptions.clear();
387 if (link != null) {
388 srcLinks.remove(link.src().deviceId(), key);
389 dstLinks.remove(link.dst().deviceId(), key);
390 return new LinkEvent(LINK_REMOVED, link);
391 }
392 return null;
393 }
394 }
395
396 private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
397 return synchronizedSetMultimap(HashMultimap.<K, V>create());
398 }
399
400 /**
401 * @return primary ProviderID, or randomly chosen one if none exists
402 */
403 private ProviderId pickPrimaryProviderId(
404 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> providerDescs) {
405
406 ProviderId fallBackPrimary = null;
407 for (Entry<ProviderId, Timestamped<LinkDescription>> e : providerDescs.entrySet()) {
408 if (!e.getKey().isAncillary()) {
409 return e.getKey();
410 } else if (fallBackPrimary == null) {
411 // pick randomly as a fallback in case there is no primary
412 fallBackPrimary = e.getKey();
413 }
414 }
415 return fallBackPrimary;
416 }
417
418 private Link composeLink(ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
419 ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
420 Timestamped<LinkDescription> base = linkDescriptions.get(primaryProviderId);
421
422 ConnectPoint src = base.value().src();
423 ConnectPoint dst = base.value().dst();
424 Type type = base.value().type();
425 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
426 annotations = merge(annotations, base.value().annotations());
427
428 for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
429 if (primaryProviderId.equals(e.getKey())) {
430 continue;
431 }
432
433 // TODO: should keep track of Description timestamp
434 // and only merge conflicting keys when timestamp is newer
435 // Currently assuming there will never be a key conflict between
436 // providers
437
438 // annotation merging. not so efficient, should revisit later
439 annotations = merge(annotations, e.getValue().value().annotations());
440 }
441
442 return new DefaultLink(primaryProviderId , src, dst, type, annotations);
443 }
444
445 private ConcurrentMap<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
446 return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
447 NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
448 }
449
Madan Jampania97e8202014-10-10 17:01:33 -0700450 private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
451 return getLinkDescriptions(key).get(providerId);
452 }
453
Madan Jampani2ff05592014-10-10 15:42:47 -0700454 private final Function<LinkKey, Link> lookupLink = new LookupLink();
455 private Function<LinkKey, Link> lookupLink() {
456 return lookupLink;
457 }
458
459 private final class LookupLink implements Function<LinkKey, Link> {
460 @Override
461 public Link apply(LinkKey input) {
462 return links.get(input);
463 }
464 }
465
466 private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
467 private static final Predicate<Provided> isPrimary() {
468 return IS_PRIMARY;
469 }
470
471 private static final class IsPrimary implements Predicate<Provided> {
472
473 @Override
474 public boolean apply(Provided input) {
475 return !input.providerId().isAncillary();
476 }
477 }
478
479 private void notifyDelegateIfNotNull(LinkEvent event) {
480 if (event != null) {
481 notifyDelegate(event);
482 }
483 }
484
485 // TODO: should we be throwing exception?
486 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
487 ClusterMessage message = new ClusterMessage(
488 clusterService.getLocalNode().id(),
489 subject,
490 SERIALIZER.encode(event));
491 clusterCommunicator.broadcast(message);
492 }
493
Madan Jampania97e8202014-10-10 17:01:33 -0700494 // TODO: should we be throwing exception?
495 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
496 try {
497 ClusterMessage message = new ClusterMessage(
498 clusterService.getLocalNode().id(),
499 subject,
500 SERIALIZER.encode(event));
501 clusterCommunicator.unicast(message, recipient);
502 } catch (IOException e) {
503 log.error("Failed to send a {} message to {}", subject.value(), recipient);
504 }
505 }
506
Madan Jampani2ff05592014-10-10 15:42:47 -0700507 private void notifyPeers(InternalLinkEvent event) throws IOException {
508 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
509 }
510
511 private void notifyPeers(InternalLinkRemovedEvent event) throws IOException {
512 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
513 }
514
Madan Jampania97e8202014-10-10 17:01:33 -0700515 private void notifyPeer(NodeId peer, InternalLinkEvent event) {
516 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
517 }
518
519 private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
520 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
521 }
522
523 private final class SendAdvertisementTask implements Runnable {
524
525 @Override
526 public void run() {
527 if (Thread.currentThread().isInterrupted()) {
528 log.info("Interrupted, quitting");
529 return;
530 }
531
532 try {
533 final NodeId self = clusterService.getLocalNode().id();
534 Set<ControllerNode> nodes = clusterService.getNodes();
535
536 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
537 .transform(toNodeId())
538 .toList();
539
540 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -0700541 log.debug("No other peers in the cluster.");
Madan Jampania97e8202014-10-10 17:01:33 -0700542 return;
543 }
544
545 NodeId peer;
546 do {
547 int idx = RandomUtils.nextInt(0, nodeIds.size());
548 peer = nodeIds.get(idx);
549 } while (peer.equals(self));
550
551 LinkAntiEntropyAdvertisement ad = createAdvertisement();
552
553 if (Thread.currentThread().isInterrupted()) {
554 log.info("Interrupted, quitting");
555 return;
556 }
557
558 try {
559 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
560 } catch (Exception e) {
561 log.error("Failed to send anti-entropy advertisement", e);
562 return;
563 }
564 } catch (Exception e) {
565 // catch all Exception to avoid Scheduled task being suppressed.
566 log.error("Exception thrown while sending advertisement", e);
567 }
568 }
569 }
570
571 private LinkAntiEntropyAdvertisement createAdvertisement() {
572 final NodeId self = clusterService.getLocalNode().id();
573
574 Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
575 Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
576
577 for (Entry<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>>
578 provs : linkDescs.entrySet()) {
579
580 final LinkKey linkKey = provs.getKey();
581 final ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDesc = provs.getValue();
582 synchronized (linkDesc) {
583 for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
584 linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
585 }
586 }
587 }
588
589 linkTombstones.putAll(removedLinks);
590
591 return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
592 }
593
594 private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
595
596 NodeId peer = advertisement.sender();
597
598 Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
599 Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
600 for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
601 LinkFragmentId linkFragmentId = entry.getKey();
602 Timestamp peerTimestamp = entry.getValue();
603
604 LinkKey key = linkFragmentId.linkKey();
605 ProviderId providerId = linkFragmentId.providerId();
606
607 Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
608 if (linkDescription.isNewer(peerTimestamp)) {
609 // I have more recent link description. update peer.
610 notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
611 }
612 // else TODO: Peer has more recent link description. request it.
613
614 Timestamp linkRemovedTimestamp = removedLinks.get(key);
615 if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
616 // peer has a zombie link. update peer.
617 notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
618 }
619 }
620
621 for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
622 LinkKey key = entry.getKey();
623 Timestamp peerTimestamp = entry.getValue();
624
625 ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
626 if (primaryProviderId != null) {
627 if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
628 notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
629 }
630 }
631 }
632 }
633
Madan Jampani2ff05592014-10-10 15:42:47 -0700634 private class InternalLinkEventListener implements ClusterMessageHandler {
635 @Override
636 public void handle(ClusterMessage message) {
637
638 log.info("Received link event from peer: {}", message.sender());
639 InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
640
641 ProviderId providerId = event.providerId();
642 Timestamped<LinkDescription> linkDescription = event.linkDescription();
643
644 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
645 }
646 }
647
648 private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
649 @Override
650 public void handle(ClusterMessage message) {
651
652 log.info("Received link removed event from peer: {}", message.sender());
653 InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
654
655 LinkKey linkKey = event.linkKey();
656 Timestamp timestamp = event.timestamp();
657
658 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
659 }
660 }
Madan Jampania97e8202014-10-10 17:01:33 -0700661
662 private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
663
664 @Override
665 public void handle(ClusterMessage message) {
666 log.info("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
667 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
668 handleAntiEntropyAdvertisement(advertisement);
669 }
670 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700671}