blob: 4025d0c27a71b2f21080af4ea5a00c81a94cec2c [file] [log] [blame]
Madan Jampaniab994042014-10-13 15:34:04 -07001package org.onlab.onos.store.host.impl;
2
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -07003import com.google.common.collect.FluentIterable;
Madan Jampaniab994042014-10-13 15:34:04 -07004import com.google.common.collect.HashMultimap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -07005import com.google.common.collect.ImmutableList;
Madan Jampaniab994042014-10-13 15:34:04 -07006import com.google.common.collect.ImmutableSet;
7import com.google.common.collect.Multimap;
8import com.google.common.collect.Sets;
9
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070010import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Deactivate;
14import org.apache.felix.scr.annotations.Reference;
15import org.apache.felix.scr.annotations.ReferenceCardinality;
16import org.apache.felix.scr.annotations.Service;
17import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070018import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.NodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070020import org.onlab.onos.net.Annotations;
21import org.onlab.onos.net.ConnectPoint;
22import org.onlab.onos.net.DefaultHost;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Host;
25import org.onlab.onos.net.HostId;
26import org.onlab.onos.net.HostLocation;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070027import org.onlab.onos.net.host.DefaultHostDescription;
Madan Jampaniab994042014-10-13 15:34:04 -070028import org.onlab.onos.net.host.HostClockService;
29import org.onlab.onos.net.host.HostDescription;
30import org.onlab.onos.net.host.HostEvent;
31import org.onlab.onos.net.host.HostStore;
32import org.onlab.onos.net.host.HostStoreDelegate;
33import org.onlab.onos.net.host.PortAddresses;
34import org.onlab.onos.net.provider.ProviderId;
35import org.onlab.onos.store.AbstractStore;
36import org.onlab.onos.store.Timestamp;
37import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070038import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070039import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070040import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampaniab994042014-10-13 15:34:04 -070041import org.onlab.onos.store.common.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070042import org.onlab.onos.store.serializers.DistributedStoreSerializers;
43import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampaniab994042014-10-13 15:34:04 -070044import org.onlab.packet.IpPrefix;
45import org.onlab.packet.MacAddress;
46import org.onlab.packet.VlanId;
Madan Jampani312a2982014-10-14 21:07:16 -070047import org.onlab.util.KryoPool;
Madan Jampaniab994042014-10-13 15:34:04 -070048import org.slf4j.Logger;
49
Madan Jampani312a2982014-10-14 21:07:16 -070050import java.io.IOException;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070051import java.util.HashMap;
Madan Jampaniab994042014-10-13 15:34:04 -070052import java.util.HashSet;
53import java.util.Map;
54import java.util.Set;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070055import java.util.Map.Entry;
Madan Jampaniab994042014-10-13 15:34:04 -070056import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070057import java.util.concurrent.ScheduledExecutorService;
58import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070059
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070060import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
61import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070062import static org.onlab.onos.net.host.HostEvent.Type.*;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070063import static org.onlab.util.Tools.namedThreads;
Madan Jampaniab994042014-10-13 15:34:04 -070064import static org.slf4j.LoggerFactory.getLogger;
65
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070066//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070067/**
68 * Manages inventory of end-station hosts in distributed data store
69 * that uses optimistic replication and gossip based techniques.
70 */
71@Component(immediate = true)
72@Service
73public class GossipHostStore
74 extends AbstractStore<HostEvent, HostStoreDelegate>
75 implements HostStore {
76
77 private final Logger log = getLogger(getClass());
78
79 // Host inventory
80 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
81
82 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
83
84 // Hosts tracked by their location
85 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
86
87 private final Map<ConnectPoint, PortAddresses> portAddresses =
88 new ConcurrentHashMap<>();
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected HostClockService hostClockService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected ClusterCommunicationService clusterCommunicator;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected ClusterService clusterService;
98
Madan Jampani312a2982014-10-14 21:07:16 -070099 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
100 @Override
101 protected void setupKryoPool() {
102 serializerPool = KryoPool.newBuilder()
103 .register(DistributedStoreSerializers.COMMON)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700104 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700105 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700106 .register(HostFragmentId.class)
107 .register(HostAntiEntropyAdvertisement.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700108 .build()
109 .populate(1);
110 }
111 };
112
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700113 private ScheduledExecutorService executor;
114
Madan Jampaniab994042014-10-13 15:34:04 -0700115 @Activate
116 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700117 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700118 GossipHostStoreMessageSubjects.HOST_UPDATED,
119 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700120 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700121 GossipHostStoreMessageSubjects.HOST_REMOVED,
122 new InternalHostRemovedEventListener());
123 clusterCommunicator.addSubscriber(
124 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
125 new InternalHostAntiEntropyAdvertisementListener());
126
127 executor =
128 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
129
130 // TODO: Make these configurable
131 long initialDelaySec = 5;
132 long periodSec = 5;
133 // start anti-entropy thread
134 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
135 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700136
Madan Jampaniab994042014-10-13 15:34:04 -0700137 log.info("Started");
138 }
139
140 @Deactivate
141 public void deactivate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700142 executor.shutdownNow();
143 try {
144 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
145 log.error("Timeout during executor shutdown");
146 }
147 } catch (InterruptedException e) {
148 log.error("Error during executor shutdown", e);
149 }
150
151 hosts.clear();
152 removedHosts.clear();
153 locations.clear();
154 portAddresses.clear();
155
Madan Jampaniab994042014-10-13 15:34:04 -0700156 log.info("Stopped");
157 }
158
159 @Override
160 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
161 HostDescription hostDescription) {
162 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700163 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
164 if (event != null) {
165 log.info("Notifying peers of a host topology event for providerId: "
166 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
167 try {
168 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
169 } catch (IOException e) {
170 log.error("Failed to notify peers of a host topology event for providerId: "
171 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
172 }
173 }
174 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700175 }
176
177 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
178 HostDescription hostDescription, Timestamp timestamp) {
179 StoredHost host = hosts.get(hostId);
180 if (host == null) {
181 return createHost(providerId, hostId, hostDescription, timestamp);
182 }
183 return updateHost(providerId, host, hostDescription, timestamp);
184 }
185
186 // creates a new host and sends HOST_ADDED
187 private HostEvent createHost(ProviderId providerId, HostId hostId,
188 HostDescription descr, Timestamp timestamp) {
189 synchronized (this) {
190 // If this host was previously removed, first ensure
191 // this new request is "newer"
192 if (removedHosts.containsKey(hostId)) {
193 if (removedHosts.get(hostId).isNewer(timestamp)) {
194 return null;
195 } else {
196 removedHosts.remove(hostId);
197 }
198 }
199 StoredHost newhost = new StoredHost(providerId, hostId,
200 descr.hwAddress(),
201 descr.vlan(),
202 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700203 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700204 hosts.put(hostId, newhost);
205 locations.put(descr.location(), newhost);
206 return new HostEvent(HOST_ADDED, newhost);
207 }
208 }
209
210 // checks for type of update to host, sends appropriate event
211 private HostEvent updateHost(ProviderId providerId, StoredHost host,
212 HostDescription descr, Timestamp timestamp) {
213 HostEvent event;
214 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
215 host.setLocation(new Timestamped<>(descr.location(), timestamp));
216 return new HostEvent(HOST_MOVED, host);
217 }
218
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700219 if (host.ipAddresses().containsAll(descr.ipAddress())) {
Madan Jampaniab994042014-10-13 15:34:04 -0700220 return null;
221 }
222
223 Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700224 addresses.addAll(descr.ipAddress());
Madan Jampaniab994042014-10-13 15:34:04 -0700225 StoredHost updated = new StoredHost(providerId, host.id(),
226 host.mac(), host.vlan(),
227 host.location, addresses);
228 event = new HostEvent(HOST_UPDATED, updated);
229 synchronized (this) {
230 hosts.put(host.id(), updated);
231 locations.remove(host.location(), host);
232 locations.put(updated.location(), updated);
233 }
234 return event;
235 }
236
237 @Override
238 public HostEvent removeHost(HostId hostId) {
239 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700240 HostEvent event = removeHostInternal(hostId, timestamp);
241 if (event != null) {
242 log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
243 try {
244 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
245 } catch (IOException e) {
246 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
247 }
248 }
249 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700250 }
251
252 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
253 synchronized (this) {
254 Host host = hosts.remove(hostId);
255 if (host != null) {
256 locations.remove((host.location()), host);
257 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
258 return new HostEvent(HOST_REMOVED, host);
259 }
260 return null;
261 }
262 }
263
264 @Override
265 public int getHostCount() {
266 return hosts.size();
267 }
268
269 @Override
270 public Iterable<Host> getHosts() {
271 return ImmutableSet.<Host>copyOf(hosts.values());
272 }
273
274 @Override
275 public Host getHost(HostId hostId) {
276 return hosts.get(hostId);
277 }
278
279 @Override
280 public Set<Host> getHosts(VlanId vlanId) {
281 Set<Host> vlanset = new HashSet<>();
282 for (Host h : hosts.values()) {
283 if (h.vlan().equals(vlanId)) {
284 vlanset.add(h);
285 }
286 }
287 return vlanset;
288 }
289
290 @Override
291 public Set<Host> getHosts(MacAddress mac) {
292 Set<Host> macset = new HashSet<>();
293 for (Host h : hosts.values()) {
294 if (h.mac().equals(mac)) {
295 macset.add(h);
296 }
297 }
298 return macset;
299 }
300
301 @Override
302 public Set<Host> getHosts(IpPrefix ip) {
303 Set<Host> ipset = new HashSet<>();
304 for (Host h : hosts.values()) {
305 if (h.ipAddresses().contains(ip)) {
306 ipset.add(h);
307 }
308 }
309 return ipset;
310 }
311
312 @Override
313 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
314 return ImmutableSet.copyOf(locations.get(connectPoint));
315 }
316
317 @Override
318 public Set<Host> getConnectedHosts(DeviceId deviceId) {
319 Set<Host> hostset = new HashSet<>();
320 for (ConnectPoint p : locations.keySet()) {
321 if (p.deviceId().equals(deviceId)) {
322 hostset.addAll(locations.get(p));
323 }
324 }
325 return hostset;
326 }
327
328 @Override
329 public void updateAddressBindings(PortAddresses addresses) {
330 synchronized (portAddresses) {
331 PortAddresses existing = portAddresses.get(addresses.connectPoint());
332 if (existing == null) {
333 portAddresses.put(addresses.connectPoint(), addresses);
334 } else {
335 Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
336 .immutableCopy();
337
338 MacAddress newMac = (addresses.mac() == null) ? existing.mac()
339 : addresses.mac();
340
341 PortAddresses newAddresses =
342 new PortAddresses(addresses.connectPoint(), union, newMac);
343
344 portAddresses.put(newAddresses.connectPoint(), newAddresses);
345 }
346 }
347 }
348
349 @Override
350 public void removeAddressBindings(PortAddresses addresses) {
351 synchronized (portAddresses) {
352 PortAddresses existing = portAddresses.get(addresses.connectPoint());
353 if (existing != null) {
354 Set<IpPrefix> difference =
355 Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
356
357 // If they removed the existing mac, set the new mac to null.
358 // Otherwise, keep the existing mac.
359 MacAddress newMac = existing.mac();
360 if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
361 newMac = null;
362 }
363
364 PortAddresses newAddresses =
365 new PortAddresses(addresses.connectPoint(), difference, newMac);
366
367 portAddresses.put(newAddresses.connectPoint(), newAddresses);
368 }
369 }
370 }
371
372 @Override
373 public void clearAddressBindings(ConnectPoint connectPoint) {
374 synchronized (portAddresses) {
375 portAddresses.remove(connectPoint);
376 }
377 }
378
379 @Override
380 public Set<PortAddresses> getAddressBindings() {
381 synchronized (portAddresses) {
382 return new HashSet<>(portAddresses.values());
383 }
384 }
385
386 @Override
387 public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
388 PortAddresses addresses;
389
390 synchronized (portAddresses) {
391 addresses = portAddresses.get(connectPoint);
392 }
393
394 if (addresses == null) {
395 addresses = new PortAddresses(connectPoint, null, null);
396 }
397
398 return addresses;
399 }
400
401 // Auxiliary extension to allow location to mutate.
402 private class StoredHost extends DefaultHost {
403 private Timestamped<HostLocation> location;
404
405 /**
406 * Creates an end-station host using the supplied information.
407 *
408 * @param providerId provider identity
409 * @param id host identifier
410 * @param mac host MAC address
411 * @param vlan host VLAN identifier
412 * @param location host location
413 * @param ips host IP addresses
414 * @param annotations optional key/value annotations
415 */
416 public StoredHost(ProviderId providerId, HostId id,
417 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
418 Set<IpPrefix> ips, Annotations... annotations) {
419 super(providerId, id, mac, vlan, location.value(), ips, annotations);
420 this.location = location;
421 }
422
423 void setLocation(Timestamped<HostLocation> location) {
424 this.location = location;
425 }
426
427 @Override
428 public HostLocation location() {
429 return location.value();
430 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700431
432 public Timestamp timestamp() {
433 return location.timestamp();
434 }
Madan Jampaniab994042014-10-13 15:34:04 -0700435 }
Madan Jampani312a2982014-10-14 21:07:16 -0700436
437 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
438 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
439 }
440
441 private void notifyPeers(InternalHostEvent event) throws IOException {
442 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
443 }
444
445 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
446 ClusterMessage message = new ClusterMessage(
447 clusterService.getLocalNode().id(),
448 subject,
449 SERIALIZER.encode(event));
450 clusterCommunicator.broadcast(message);
451 }
Madan Jampani255618f2014-10-14 23:27:57 -0700452
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700453 private void unicastMessage(NodeId peer,
454 MessageSubject subject,
455 Object event) throws IOException {
456 ClusterMessage message = new ClusterMessage(
457 clusterService.getLocalNode().id(),
458 subject,
459 SERIALIZER.encode(event));
460 clusterCommunicator.unicast(message, peer);
461 }
462
Madan Jampani255618f2014-10-14 23:27:57 -0700463 private void notifyDelegateIfNotNull(HostEvent event) {
464 if (event != null) {
465 notifyDelegate(event);
466 }
467 }
468
469 private class InternalHostEventListener implements ClusterMessageHandler {
470 @Override
471 public void handle(ClusterMessage message) {
472
473 log.info("Received host update event from peer: {}", message.sender());
474 InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
475
476 ProviderId providerId = event.providerId();
477 HostId hostId = event.hostId();
478 HostDescription hostDescription = event.hostDescription();
479 Timestamp timestamp = event.timestamp();
480
481 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
482 }
483 }
484
485 private class InternalHostRemovedEventListener implements ClusterMessageHandler {
486 @Override
487 public void handle(ClusterMessage message) {
488
489 log.info("Received host removed event from peer: {}", message.sender());
490 InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
491
492 HostId hostId = event.hostId();
493 Timestamp timestamp = event.timestamp();
494
495 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
496 }
497 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700498
499 private final class SendAdvertisementTask implements Runnable {
500
501 @Override
502 public void run() {
503 if (Thread.currentThread().isInterrupted()) {
504 log.info("Interrupted, quitting");
505 return;
506 }
507
508 try {
509 final NodeId self = clusterService.getLocalNode().id();
510 Set<ControllerNode> nodes = clusterService.getNodes();
511
512 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
513 .transform(toNodeId())
514 .toList();
515
516 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
517 log.debug("No other peers in the cluster.");
518 return;
519 }
520
521 NodeId peer;
522 do {
523 int idx = RandomUtils.nextInt(0, nodeIds.size());
524 peer = nodeIds.get(idx);
525 } while (peer.equals(self));
526
527 HostAntiEntropyAdvertisement ad = createAdvertisement();
528
529 if (Thread.currentThread().isInterrupted()) {
530 log.info("Interrupted, quitting");
531 return;
532 }
533
534 try {
535 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
536 } catch (IOException e) {
537 log.debug("Failed to send anti-entropy advertisement", e);
538 return;
539 }
540 } catch (Exception e) {
541 // catch all Exception to avoid Scheduled task being suppressed.
542 log.error("Exception thrown while sending advertisement", e);
543 }
544 }
545 }
546
547 private HostAntiEntropyAdvertisement createAdvertisement() {
548 final NodeId self = clusterService.getLocalNode().id();
549
550 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
551 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
552
553 for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
554
555 final HostId hostId = e.getKey();
556 final StoredHost hostInfo = e.getValue();
557 final ProviderId providerId = hostInfo.providerId();
558 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
559 }
560
561 for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
562 tombstones.put(e.getKey(), e.getValue().timestamp());
563 }
564
565 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
566 }
567
568 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
569
570 final NodeId sender = ad.sender();
571
572 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
573 // for each locally live Hosts...
574 final HostId hostId = host.getKey();
575 final StoredHost localHost = host.getValue();
576 final ProviderId providerId = localHost.providerId();
577 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
578 final Timestamp localLiveTimestamp = localHost.timestamp();
579
580 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
581 if (remoteTimestamp == null) {
582 remoteTimestamp = ad.tombstones().get(hostId);
583 }
584 if (remoteTimestamp == null ||
585 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
586
587 // local is more recent, push
588 // TODO: annotation is lost
589 final HostDescription desc = new DefaultHostDescription(
590 localHost.mac(),
591 localHost.vlan(),
592 localHost.location(),
593 localHost.ipAddresses());
594 try {
595 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
596 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
597 } catch (IOException e1) {
598 log.debug("Failed to send advertisement response", e1);
599 }
600 }
601
602 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
603 if (remoteDeadTimestamp != null &&
604 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
605 // sender has recent remove
606 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
607 }
608 }
609
610 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
611 // for each locally dead Hosts
612 final HostId hostId = dead.getKey();
613 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
614
615 // TODO: pick proper ProviderId, when supporting multi-provider
616 final ProviderId providerId = dead.getValue().value().providerId();
617 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
618
619 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
620 if (remoteLiveTimestamp != null &&
621 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
622 // sender has zombie, push
623 try {
624 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
625 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
626 } catch (IOException e1) {
627 log.debug("Failed to send advertisement response", e1);
628 }
629 }
630 }
631
632
633 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
634 // for each remote tombstone advertisement...
635 final HostId hostId = e.getKey();
636 final Timestamp adRemoveTimestamp = e.getValue();
637
638 final StoredHost storedHost = hosts.get(hostId);
639 if (storedHost == null) {
640 continue;
641 }
642 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
643 // sender has recent remove info, locally remove
644 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
645 }
646 }
647 }
648
649 private final class InternalHostAntiEntropyAdvertisementListener implements
650 ClusterMessageHandler {
651
652 @Override
653 public void handle(ClusterMessage message) {
654 log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
655 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
656 handleAntiEntropyAdvertisement(advertisement);
657 }
658 }
Madan Jampaniab994042014-10-13 15:34:04 -0700659}