blob: 043917e61e546b5a6258006fc42b0c4043ad8380 [file] [log] [blame]
Madan Jampani2ff05592014-10-10 15:42:47 -07001package org.onlab.onos.store.link.impl;
2
3import com.google.common.base.Function;
4import com.google.common.base.Predicate;
5import com.google.common.collect.FluentIterable;
6import com.google.common.collect.HashMultimap;
Madan Jampania97e8202014-10-10 17:01:33 -07007import com.google.common.collect.ImmutableList;
Madan Jampani2ff05592014-10-10 15:42:47 -07008import com.google.common.collect.Maps;
9import com.google.common.collect.SetMultimap;
10
Madan Jampania97e8202014-10-10 17:01:33 -070011import org.apache.commons.lang3.RandomUtils;
Madan Jampani2ff05592014-10-10 15:42:47 -070012import org.apache.commons.lang3.concurrent.ConcurrentUtils;
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
16import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
18import org.apache.felix.scr.annotations.Service;
19import org.onlab.onos.cluster.ClusterService;
Madan Jampania97e8202014-10-10 17:01:33 -070020import org.onlab.onos.cluster.ControllerNode;
21import org.onlab.onos.cluster.NodeId;
Madan Jampani2ff05592014-10-10 15:42:47 -070022import org.onlab.onos.net.AnnotationsUtil;
23import org.onlab.onos.net.ConnectPoint;
24import org.onlab.onos.net.DefaultAnnotations;
25import org.onlab.onos.net.DefaultLink;
26import org.onlab.onos.net.DeviceId;
27import org.onlab.onos.net.Link;
28import org.onlab.onos.net.SparseAnnotations;
29import org.onlab.onos.net.Link.Type;
30import org.onlab.onos.net.LinkKey;
31import org.onlab.onos.net.Provided;
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -070032import org.onlab.onos.net.device.DeviceClockService;
Madan Jampani2ff05592014-10-10 15:42:47 -070033import org.onlab.onos.net.link.DefaultLinkDescription;
34import org.onlab.onos.net.link.LinkDescription;
35import org.onlab.onos.net.link.LinkEvent;
36import org.onlab.onos.net.link.LinkStore;
37import org.onlab.onos.net.link.LinkStoreDelegate;
38import org.onlab.onos.net.provider.ProviderId;
39import org.onlab.onos.store.AbstractStore;
Madan Jampani2ff05592014-10-10 15:42:47 -070040import org.onlab.onos.store.Timestamp;
41import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
42import org.onlab.onos.store.cluster.messaging.ClusterMessage;
43import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
44import org.onlab.onos.store.cluster.messaging.MessageSubject;
45import org.onlab.onos.store.common.impl.Timestamped;
46import org.onlab.onos.store.serializers.DistributedStoreSerializers;
47import org.onlab.onos.store.serializers.KryoSerializer;
48import org.onlab.util.KryoPool;
49import org.onlab.util.NewConcurrentHashMap;
50import org.slf4j.Logger;
51
52import java.io.IOException;
53import java.util.Collections;
Madan Jampania97e8202014-10-10 17:01:33 -070054import java.util.HashMap;
Madan Jampani2ff05592014-10-10 15:42:47 -070055import java.util.HashSet;
56import java.util.Map;
57import java.util.Set;
58import java.util.Map.Entry;
59import java.util.concurrent.ConcurrentHashMap;
60import java.util.concurrent.ConcurrentMap;
Madan Jampania97e8202014-10-10 17:01:33 -070061import java.util.concurrent.ScheduledExecutorService;
62import java.util.concurrent.TimeUnit;
Madan Jampani2ff05592014-10-10 15:42:47 -070063
Madan Jampania97e8202014-10-10 17:01:33 -070064import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
65import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Madan Jampani2ff05592014-10-10 15:42:47 -070066import static org.onlab.onos.net.DefaultAnnotations.union;
67import static org.onlab.onos.net.DefaultAnnotations.merge;
68import static org.onlab.onos.net.Link.Type.DIRECT;
69import static org.onlab.onos.net.Link.Type.INDIRECT;
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -070070import static org.onlab.onos.net.LinkKey.linkKey;
Madan Jampani2ff05592014-10-10 15:42:47 -070071import static org.onlab.onos.net.link.LinkEvent.Type.*;
Madan Jampania97e8202014-10-10 17:01:33 -070072import static org.onlab.util.Tools.namedThreads;
Madan Jampani2ff05592014-10-10 15:42:47 -070073import static org.slf4j.LoggerFactory.getLogger;
74import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
75import static com.google.common.base.Predicates.notNull;
76
77/**
78 * Manages inventory of infrastructure links in distributed data store
79 * that uses optimistic replication and gossip based techniques.
80 */
81@Component(immediate = true)
82@Service
83public class GossipLinkStore
84 extends AbstractStore<LinkEvent, LinkStoreDelegate>
85 implements LinkStore {
86
87 private final Logger log = getLogger(getClass());
88
89 // Link inventory
90 private final ConcurrentMap<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>> linkDescs =
91 new ConcurrentHashMap<>();
92
93 // Link instance cache
94 private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
95
96 // Egress and ingress link sets
97 private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
98 private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
99
100 // Remove links
101 private final Map<LinkKey, Timestamp> removedLinks = Maps.newHashMap();
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700104 protected DeviceClockService deviceClockService;
Madan Jampani2ff05592014-10-10 15:42:47 -0700105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ClusterCommunicationService clusterCommunicator;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterService clusterService;
111
112 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
113 @Override
114 protected void setupKryoPool() {
115 serializerPool = KryoPool.newBuilder()
116 .register(DistributedStoreSerializers.COMMON)
117 .register(InternalLinkEvent.class)
118 .register(InternalLinkRemovedEvent.class)
Madan Jampanid2054d42014-10-10 17:27:06 -0700119 .register(LinkAntiEntropyAdvertisement.class)
120 .register(LinkFragmentId.class)
Madan Jampani2ff05592014-10-10 15:42:47 -0700121 .build()
122 .populate(1);
123 }
124 };
125
Madan Jampania97e8202014-10-10 17:01:33 -0700126 private ScheduledExecutorService executor;
127
Madan Jampani2ff05592014-10-10 15:42:47 -0700128 @Activate
129 public void activate() {
130
131 clusterCommunicator.addSubscriber(
Madan Jampania97e8202014-10-10 17:01:33 -0700132 GossipLinkStoreMessageSubjects.LINK_UPDATE,
133 new InternalLinkEventListener());
Madan Jampani2ff05592014-10-10 15:42:47 -0700134 clusterCommunicator.addSubscriber(
Madan Jampania97e8202014-10-10 17:01:33 -0700135 GossipLinkStoreMessageSubjects.LINK_REMOVED,
136 new InternalLinkRemovedEventListener());
137 clusterCommunicator.addSubscriber(
138 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
139 new InternalLinkAntiEntropyAdvertisementListener());
140
141 executor =
142 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
143
144 // TODO: Make these configurable
145 long initialDelaySec = 5;
146 long periodSec = 5;
147 // start anti-entropy thread
148 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
149 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani2ff05592014-10-10 15:42:47 -0700150
151 log.info("Started");
152 }
153
154 @Deactivate
155 public void deactivate() {
Madan Jampani3ffbb272014-10-13 11:19:37 -0700156
157 executor.shutdownNow();
158 try {
159 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
160 log.error("Timeout during executor shutdown");
161 }
162 } catch (InterruptedException e) {
163 log.error("Error during executor shutdown", e);
164 }
165
Madan Jampani2ff05592014-10-10 15:42:47 -0700166 linkDescs.clear();
167 links.clear();
168 srcLinks.clear();
169 dstLinks.clear();
170 log.info("Stopped");
171 }
172
173 @Override
174 public int getLinkCount() {
175 return links.size();
176 }
177
178 @Override
179 public Iterable<Link> getLinks() {
180 return Collections.unmodifiableCollection(links.values());
181 }
182
183 @Override
184 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
185 // lock for iteration
186 synchronized (srcLinks) {
187 return FluentIterable.from(srcLinks.get(deviceId))
188 .transform(lookupLink())
189 .filter(notNull())
190 .toSet();
191 }
192 }
193
194 @Override
195 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
196 // lock for iteration
197 synchronized (dstLinks) {
198 return FluentIterable.from(dstLinks.get(deviceId))
199 .transform(lookupLink())
200 .filter(notNull())
201 .toSet();
202 }
203 }
204
205 @Override
206 public Link getLink(ConnectPoint src, ConnectPoint dst) {
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -0700207 return links.get(linkKey(src, dst));
Madan Jampani2ff05592014-10-10 15:42:47 -0700208 }
209
210 @Override
211 public Set<Link> getEgressLinks(ConnectPoint src) {
212 Set<Link> egress = new HashSet<>();
213 for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
214 if (linkKey.src().equals(src)) {
215 egress.add(links.get(linkKey));
216 }
217 }
218 return egress;
219 }
220
221 @Override
222 public Set<Link> getIngressLinks(ConnectPoint dst) {
223 Set<Link> ingress = new HashSet<>();
224 for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
225 if (linkKey.dst().equals(dst)) {
226 ingress.add(links.get(linkKey));
227 }
228 }
229 return ingress;
230 }
231
232 @Override
233 public LinkEvent createOrUpdateLink(ProviderId providerId,
234 LinkDescription linkDescription) {
235
236 DeviceId dstDeviceId = linkDescription.dst().deviceId();
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700237 Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700238
239 final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
240
Yuta HIGUCHI990aecc2014-10-13 22:21:42 -0700241 LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700242 final LinkEvent event;
243 final Timestamped<LinkDescription> mergedDesc;
244 synchronized (getLinkDescriptions(key)) {
245 event = createOrUpdateLinkInternal(providerId, deltaDesc);
246 mergedDesc = getLinkDescriptions(key).get(providerId);
247 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700248
249 if (event != null) {
250 log.info("Notifying peers of a link update topology event from providerId: "
251 + "{} between src: {} and dst: {}",
252 providerId, linkDescription.src(), linkDescription.dst());
253 try {
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700254 notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
Madan Jampani2ff05592014-10-10 15:42:47 -0700255 } catch (IOException e) {
256 log.info("Failed to notify peers of a link update topology event from providerId: "
257 + "{} between src: {} and dst: {}",
258 providerId, linkDescription.src(), linkDescription.dst());
259 }
260 }
261 return event;
262 }
263
264 private LinkEvent createOrUpdateLinkInternal(
265 ProviderId providerId,
266 Timestamped<LinkDescription> linkDescription) {
267
Yuta HIGUCHI990aecc2014-10-13 22:21:42 -0700268 LinkKey key = linkKey(linkDescription.value().src(),
269 linkDescription.value().dst());
Madan Jampani2ff05592014-10-10 15:42:47 -0700270 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
271
272 synchronized (descs) {
273 // if the link was previously removed, we should proceed if and
274 // only if this request is more recent.
275 Timestamp linkRemovedTimestamp = removedLinks.get(key);
276 if (linkRemovedTimestamp != null) {
277 if (linkDescription.isNewer(linkRemovedTimestamp)) {
278 removedLinks.remove(key);
279 } else {
280 return null;
281 }
282 }
283
284 final Link oldLink = links.get(key);
285 // update description
286 createOrUpdateLinkDescription(descs, providerId, linkDescription);
287 final Link newLink = composeLink(descs);
288 if (oldLink == null) {
289 return createLink(key, newLink);
290 }
291 return updateLink(key, oldLink, newLink);
292 }
293 }
294
295 // Guarded by linkDescs value (=locking each Link)
296 private Timestamped<LinkDescription> createOrUpdateLinkDescription(
297 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> existingLinkDescriptions,
298 ProviderId providerId,
299 Timestamped<LinkDescription> linkDescription) {
300
301 // merge existing attributes and merge
302 Timestamped<LinkDescription> existingLinkDescription = existingLinkDescriptions.get(providerId);
303 if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
304 return null;
305 }
306 Timestamped<LinkDescription> newLinkDescription = linkDescription;
307 if (existingLinkDescription != null) {
308 SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
309 linkDescription.value().annotations());
310 newLinkDescription = new Timestamped<LinkDescription>(
311 new DefaultLinkDescription(
312 linkDescription.value().src(),
313 linkDescription.value().dst(),
314 linkDescription.value().type(), merged),
315 linkDescription.timestamp());
316 }
317 return existingLinkDescriptions.put(providerId, newLinkDescription);
318 }
319
320 // Creates and stores the link and returns the appropriate event.
321 // Guarded by linkDescs value (=locking each Link)
322 private LinkEvent createLink(LinkKey key, Link newLink) {
323
324 if (newLink.providerId().isAncillary()) {
325 // TODO: revisit ancillary only Link handling
326
327 // currently treating ancillary only as down (not visible outside)
328 return null;
329 }
330
331 links.put(key, newLink);
332 srcLinks.put(newLink.src().deviceId(), key);
333 dstLinks.put(newLink.dst().deviceId(), key);
334 return new LinkEvent(LINK_ADDED, newLink);
335 }
336
337 // Updates, if necessary the specified link and returns the appropriate event.
338 // Guarded by linkDescs value (=locking each Link)
339 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
340
341 if (newLink.providerId().isAncillary()) {
342 // TODO: revisit ancillary only Link handling
343
344 // currently treating ancillary only as down (not visible outside)
345 return null;
346 }
347
348 if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
349 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
350
351 links.put(key, newLink);
352 // strictly speaking following can be ommitted
353 srcLinks.put(oldLink.src().deviceId(), key);
354 dstLinks.put(oldLink.dst().deviceId(), key);
355 return new LinkEvent(LINK_UPDATED, newLink);
356 }
357 return null;
358 }
359
360 @Override
361 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -0700362 final LinkKey key = linkKey(src, dst);
Madan Jampani2ff05592014-10-10 15:42:47 -0700363
364 DeviceId dstDeviceId = dst.deviceId();
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700365 Timestamp timestamp = deviceClockService.getTimestamp(dstDeviceId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700366
367 LinkEvent event = removeLinkInternal(key, timestamp);
368
369 if (event != null) {
370 log.info("Notifying peers of a link removed topology event for a link "
371 + "between src: {} and dst: {}", src, dst);
372 try {
373 notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
374 } catch (IOException e) {
375 log.error("Failed to notify peers of a link removed topology event for a link "
376 + "between src: {} and dst: {}", src, dst);
377 }
378 }
379 return event;
380 }
381
382 private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
383 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
384 getLinkDescriptions(key);
385 synchronized (linkDescriptions) {
386 // accept removal request if given timestamp is newer than
387 // the latest Timestamp from Primary provider
388 ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
389 if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
390 return null;
391 }
392 removedLinks.put(key, timestamp);
393 Link link = links.remove(key);
394 linkDescriptions.clear();
395 if (link != null) {
396 srcLinks.remove(link.src().deviceId(), key);
397 dstLinks.remove(link.dst().deviceId(), key);
398 return new LinkEvent(LINK_REMOVED, link);
399 }
400 return null;
401 }
402 }
403
404 private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
405 return synchronizedSetMultimap(HashMultimap.<K, V>create());
406 }
407
408 /**
409 * @return primary ProviderID, or randomly chosen one if none exists
410 */
411 private ProviderId pickPrimaryProviderId(
412 ConcurrentMap<ProviderId, Timestamped<LinkDescription>> providerDescs) {
413
414 ProviderId fallBackPrimary = null;
415 for (Entry<ProviderId, Timestamped<LinkDescription>> e : providerDescs.entrySet()) {
416 if (!e.getKey().isAncillary()) {
417 return e.getKey();
418 } else if (fallBackPrimary == null) {
419 // pick randomly as a fallback in case there is no primary
420 fallBackPrimary = e.getKey();
421 }
422 }
423 return fallBackPrimary;
424 }
425
426 private Link composeLink(ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
427 ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
428 Timestamped<LinkDescription> base = linkDescriptions.get(primaryProviderId);
429
430 ConnectPoint src = base.value().src();
431 ConnectPoint dst = base.value().dst();
432 Type type = base.value().type();
433 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
434 annotations = merge(annotations, base.value().annotations());
435
436 for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
437 if (primaryProviderId.equals(e.getKey())) {
438 continue;
439 }
440
441 // TODO: should keep track of Description timestamp
442 // and only merge conflicting keys when timestamp is newer
443 // Currently assuming there will never be a key conflict between
444 // providers
445
446 // annotation merging. not so efficient, should revisit later
447 annotations = merge(annotations, e.getValue().value().annotations());
448 }
449
450 return new DefaultLink(primaryProviderId , src, dst, type, annotations);
451 }
452
453 private ConcurrentMap<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
454 return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
455 NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
456 }
457
Madan Jampania97e8202014-10-10 17:01:33 -0700458 private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
459 return getLinkDescriptions(key).get(providerId);
460 }
461
Madan Jampani2ff05592014-10-10 15:42:47 -0700462 private final Function<LinkKey, Link> lookupLink = new LookupLink();
463 private Function<LinkKey, Link> lookupLink() {
464 return lookupLink;
465 }
466
467 private final class LookupLink implements Function<LinkKey, Link> {
468 @Override
469 public Link apply(LinkKey input) {
470 return links.get(input);
471 }
472 }
473
474 private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
475 private static final Predicate<Provided> isPrimary() {
476 return IS_PRIMARY;
477 }
478
479 private static final class IsPrimary implements Predicate<Provided> {
480
481 @Override
482 public boolean apply(Provided input) {
483 return !input.providerId().isAncillary();
484 }
485 }
486
487 private void notifyDelegateIfNotNull(LinkEvent event) {
488 if (event != null) {
489 notifyDelegate(event);
490 }
491 }
492
493 // TODO: should we be throwing exception?
494 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
495 ClusterMessage message = new ClusterMessage(
496 clusterService.getLocalNode().id(),
497 subject,
498 SERIALIZER.encode(event));
499 clusterCommunicator.broadcast(message);
500 }
501
Madan Jampania97e8202014-10-10 17:01:33 -0700502 // TODO: should we be throwing exception?
503 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
504 try {
505 ClusterMessage message = new ClusterMessage(
506 clusterService.getLocalNode().id(),
507 subject,
508 SERIALIZER.encode(event));
509 clusterCommunicator.unicast(message, recipient);
510 } catch (IOException e) {
511 log.error("Failed to send a {} message to {}", subject.value(), recipient);
512 }
513 }
514
Madan Jampani2ff05592014-10-10 15:42:47 -0700515 private void notifyPeers(InternalLinkEvent event) throws IOException {
516 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
517 }
518
519 private void notifyPeers(InternalLinkRemovedEvent event) throws IOException {
520 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
521 }
522
Madan Jampania97e8202014-10-10 17:01:33 -0700523 private void notifyPeer(NodeId peer, InternalLinkEvent event) {
524 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
525 }
526
527 private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
528 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
529 }
530
531 private final class SendAdvertisementTask implements Runnable {
532
533 @Override
534 public void run() {
535 if (Thread.currentThread().isInterrupted()) {
536 log.info("Interrupted, quitting");
537 return;
538 }
539
540 try {
541 final NodeId self = clusterService.getLocalNode().id();
542 Set<ControllerNode> nodes = clusterService.getNodes();
543
544 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
545 .transform(toNodeId())
546 .toList();
547
548 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHI37083082014-10-13 10:38:38 -0700549 log.debug("No other peers in the cluster.");
Madan Jampania97e8202014-10-10 17:01:33 -0700550 return;
551 }
552
553 NodeId peer;
554 do {
555 int idx = RandomUtils.nextInt(0, nodeIds.size());
556 peer = nodeIds.get(idx);
557 } while (peer.equals(self));
558
559 LinkAntiEntropyAdvertisement ad = createAdvertisement();
560
561 if (Thread.currentThread().isInterrupted()) {
562 log.info("Interrupted, quitting");
563 return;
564 }
565
566 try {
567 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
568 } catch (Exception e) {
569 log.error("Failed to send anti-entropy advertisement", e);
570 return;
571 }
572 } catch (Exception e) {
573 // catch all Exception to avoid Scheduled task being suppressed.
574 log.error("Exception thrown while sending advertisement", e);
575 }
576 }
577 }
578
579 private LinkAntiEntropyAdvertisement createAdvertisement() {
580 final NodeId self = clusterService.getLocalNode().id();
581
582 Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
583 Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
584
585 for (Entry<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>>
586 provs : linkDescs.entrySet()) {
587
588 final LinkKey linkKey = provs.getKey();
589 final ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDesc = provs.getValue();
590 synchronized (linkDesc) {
591 for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
592 linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
593 }
594 }
595 }
596
597 linkTombstones.putAll(removedLinks);
598
599 return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
600 }
601
602 private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
603
604 NodeId peer = advertisement.sender();
605
606 Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
607 Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
608 for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
609 LinkFragmentId linkFragmentId = entry.getKey();
610 Timestamp peerTimestamp = entry.getValue();
611
612 LinkKey key = linkFragmentId.linkKey();
613 ProviderId providerId = linkFragmentId.providerId();
614
615 Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
616 if (linkDescription.isNewer(peerTimestamp)) {
617 // I have more recent link description. update peer.
618 notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
619 }
620 // else TODO: Peer has more recent link description. request it.
621
622 Timestamp linkRemovedTimestamp = removedLinks.get(key);
623 if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
624 // peer has a zombie link. update peer.
625 notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
626 }
627 }
628
629 for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
630 LinkKey key = entry.getKey();
631 Timestamp peerTimestamp = entry.getValue();
632
633 ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
634 if (primaryProviderId != null) {
635 if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
636 notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
637 }
638 }
639 }
640 }
641
Madan Jampani2ff05592014-10-10 15:42:47 -0700642 private class InternalLinkEventListener implements ClusterMessageHandler {
643 @Override
644 public void handle(ClusterMessage message) {
645
646 log.info("Received link event from peer: {}", message.sender());
647 InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
648
649 ProviderId providerId = event.providerId();
650 Timestamped<LinkDescription> linkDescription = event.linkDescription();
651
652 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
653 }
654 }
655
656 private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
657 @Override
658 public void handle(ClusterMessage message) {
659
660 log.info("Received link removed event from peer: {}", message.sender());
661 InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
662
663 LinkKey linkKey = event.linkKey();
664 Timestamp timestamp = event.timestamp();
665
666 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
667 }
668 }
Madan Jampania97e8202014-10-10 17:01:33 -0700669
670 private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
671
672 @Override
673 public void handle(ClusterMessage message) {
Yuta HIGUCHI9a0a1d12014-10-13 22:38:02 -0700674 log.debug("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
Madan Jampania97e8202014-10-10 17:01:33 -0700675 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
676 handleAntiEntropyAdvertisement(advertisement);
677 }
678 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700679}