blob: 25ca29e01dede7a7e60f2ba0d64f4dfe99ffac1a [file] [log] [blame]
Madan Jampaniab994042014-10-13 15:34:04 -07001package org.onlab.onos.store.host.impl;
2
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -07003import com.google.common.collect.FluentIterable;
Madan Jampaniab994042014-10-13 15:34:04 -07004import com.google.common.collect.HashMultimap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -07005import com.google.common.collect.ImmutableList;
Madan Jampaniab994042014-10-13 15:34:04 -07006import com.google.common.collect.ImmutableSet;
7import com.google.common.collect.Multimap;
8import com.google.common.collect.Sets;
9
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070010import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Deactivate;
14import org.apache.felix.scr.annotations.Reference;
15import org.apache.felix.scr.annotations.ReferenceCardinality;
16import org.apache.felix.scr.annotations.Service;
17import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070018import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.NodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070020import org.onlab.onos.net.Annotations;
21import org.onlab.onos.net.ConnectPoint;
22import org.onlab.onos.net.DefaultHost;
23import org.onlab.onos.net.DeviceId;
24import org.onlab.onos.net.Host;
25import org.onlab.onos.net.HostId;
26import org.onlab.onos.net.HostLocation;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070027import org.onlab.onos.net.host.DefaultHostDescription;
Madan Jampaniab994042014-10-13 15:34:04 -070028import org.onlab.onos.net.host.HostClockService;
29import org.onlab.onos.net.host.HostDescription;
30import org.onlab.onos.net.host.HostEvent;
31import org.onlab.onos.net.host.HostStore;
32import org.onlab.onos.net.host.HostStoreDelegate;
Pavlin Radoslavov76b0ae22014-10-27 15:33:19 -070033import org.onlab.onos.net.host.InterfaceIpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070034import org.onlab.onos.net.host.PortAddresses;
35import org.onlab.onos.net.provider.ProviderId;
36import org.onlab.onos.store.AbstractStore;
37import org.onlab.onos.store.Timestamp;
38import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070039import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070040import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070041import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070042import org.onlab.onos.store.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070043import org.onlab.onos.store.serializers.DistributedStoreSerializers;
44import org.onlab.onos.store.serializers.KryoSerializer;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070045import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070046import org.onlab.packet.MacAddress;
47import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070048import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070049import org.slf4j.Logger;
50
Madan Jampani312a2982014-10-14 21:07:16 -070051import java.io.IOException;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070052import java.util.HashMap;
Madan Jampaniab994042014-10-13 15:34:04 -070053import java.util.HashSet;
54import java.util.Map;
55import java.util.Set;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070056import java.util.Map.Entry;
Madan Jampaniab994042014-10-13 15:34:04 -070057import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070058import java.util.concurrent.ScheduledExecutorService;
59import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070060
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070061import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
62import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070063import static org.onlab.onos.net.host.HostEvent.Type.*;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070064import static org.onlab.util.Tools.namedThreads;
Madan Jampaniab994042014-10-13 15:34:04 -070065import static org.slf4j.LoggerFactory.getLogger;
66
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070067//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070068/**
69 * Manages inventory of end-station hosts in distributed data store
70 * that uses optimistic replication and gossip based techniques.
71 */
72@Component(immediate = true)
73@Service
74public class GossipHostStore
75 extends AbstractStore<HostEvent, HostStoreDelegate>
76 implements HostStore {
77
78 private final Logger log = getLogger(getClass());
79
80 // Host inventory
81 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
82
83 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
84
85 // Hosts tracked by their location
86 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
87
88 private final Map<ConnectPoint, PortAddresses> portAddresses =
89 new ConcurrentHashMap<>();
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected HostClockService hostClockService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected ClusterCommunicationService clusterCommunicator;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected ClusterService clusterService;
99
Madan Jampani312a2982014-10-14 21:07:16 -0700100 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
101 @Override
102 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700103 serializerPool = KryoNamespace.newBuilder()
Madan Jampani312a2982014-10-14 21:07:16 -0700104 .register(DistributedStoreSerializers.COMMON)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700105 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700106 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700107 .register(HostFragmentId.class)
108 .register(HostAntiEntropyAdvertisement.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700109 .build()
110 .populate(1);
111 }
112 };
113
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700114 private ScheduledExecutorService executor;
115
Madan Jampaniab994042014-10-13 15:34:04 -0700116 @Activate
117 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700118 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700119 GossipHostStoreMessageSubjects.HOST_UPDATED,
120 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700121 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700122 GossipHostStoreMessageSubjects.HOST_REMOVED,
123 new InternalHostRemovedEventListener());
124 clusterCommunicator.addSubscriber(
125 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
126 new InternalHostAntiEntropyAdvertisementListener());
127
128 executor =
129 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
130
131 // TODO: Make these configurable
132 long initialDelaySec = 5;
133 long periodSec = 5;
134 // start anti-entropy thread
135 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
136 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700137
Madan Jampaniab994042014-10-13 15:34:04 -0700138 log.info("Started");
139 }
140
141 @Deactivate
142 public void deactivate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700143 executor.shutdownNow();
144 try {
145 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
146 log.error("Timeout during executor shutdown");
147 }
148 } catch (InterruptedException e) {
149 log.error("Error during executor shutdown", e);
150 }
151
152 hosts.clear();
153 removedHosts.clear();
154 locations.clear();
155 portAddresses.clear();
156
Madan Jampaniab994042014-10-13 15:34:04 -0700157 log.info("Stopped");
158 }
159
160 @Override
161 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
162 HostDescription hostDescription) {
163 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700164 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
165 if (event != null) {
166 log.info("Notifying peers of a host topology event for providerId: "
167 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
168 try {
169 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
170 } catch (IOException e) {
171 log.error("Failed to notify peers of a host topology event for providerId: "
172 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
173 }
174 }
175 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700176 }
177
178 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
179 HostDescription hostDescription, Timestamp timestamp) {
180 StoredHost host = hosts.get(hostId);
181 if (host == null) {
182 return createHost(providerId, hostId, hostDescription, timestamp);
183 }
184 return updateHost(providerId, host, hostDescription, timestamp);
185 }
186
187 // creates a new host and sends HOST_ADDED
188 private HostEvent createHost(ProviderId providerId, HostId hostId,
189 HostDescription descr, Timestamp timestamp) {
190 synchronized (this) {
191 // If this host was previously removed, first ensure
192 // this new request is "newer"
193 if (removedHosts.containsKey(hostId)) {
194 if (removedHosts.get(hostId).isNewer(timestamp)) {
195 return null;
196 } else {
197 removedHosts.remove(hostId);
198 }
199 }
200 StoredHost newhost = new StoredHost(providerId, hostId,
201 descr.hwAddress(),
202 descr.vlan(),
203 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700204 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700205 hosts.put(hostId, newhost);
206 locations.put(descr.location(), newhost);
207 return new HostEvent(HOST_ADDED, newhost);
208 }
209 }
210
211 // checks for type of update to host, sends appropriate event
212 private HostEvent updateHost(ProviderId providerId, StoredHost host,
213 HostDescription descr, Timestamp timestamp) {
214 HostEvent event;
215 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
216 host.setLocation(new Timestamped<>(descr.location(), timestamp));
217 return new HostEvent(HOST_MOVED, host);
218 }
219
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700220 if (host.ipAddresses().containsAll(descr.ipAddress())) {
Madan Jampaniab994042014-10-13 15:34:04 -0700221 return null;
222 }
223
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700224 Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700225 addresses.addAll(descr.ipAddress());
Madan Jampaniab994042014-10-13 15:34:04 -0700226 StoredHost updated = new StoredHost(providerId, host.id(),
227 host.mac(), host.vlan(),
228 host.location, addresses);
229 event = new HostEvent(HOST_UPDATED, updated);
230 synchronized (this) {
231 hosts.put(host.id(), updated);
232 locations.remove(host.location(), host);
233 locations.put(updated.location(), updated);
234 }
235 return event;
236 }
237
238 @Override
239 public HostEvent removeHost(HostId hostId) {
240 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700241 HostEvent event = removeHostInternal(hostId, timestamp);
242 if (event != null) {
243 log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
244 try {
245 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
246 } catch (IOException e) {
247 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
248 }
249 }
250 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700251 }
252
253 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
254 synchronized (this) {
255 Host host = hosts.remove(hostId);
256 if (host != null) {
257 locations.remove((host.location()), host);
258 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
259 return new HostEvent(HOST_REMOVED, host);
260 }
261 return null;
262 }
263 }
264
265 @Override
266 public int getHostCount() {
267 return hosts.size();
268 }
269
270 @Override
271 public Iterable<Host> getHosts() {
272 return ImmutableSet.<Host>copyOf(hosts.values());
273 }
274
275 @Override
276 public Host getHost(HostId hostId) {
277 return hosts.get(hostId);
278 }
279
280 @Override
281 public Set<Host> getHosts(VlanId vlanId) {
282 Set<Host> vlanset = new HashSet<>();
283 for (Host h : hosts.values()) {
284 if (h.vlan().equals(vlanId)) {
285 vlanset.add(h);
286 }
287 }
288 return vlanset;
289 }
290
291 @Override
292 public Set<Host> getHosts(MacAddress mac) {
293 Set<Host> macset = new HashSet<>();
294 for (Host h : hosts.values()) {
295 if (h.mac().equals(mac)) {
296 macset.add(h);
297 }
298 }
299 return macset;
300 }
301
302 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700303 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700304 Set<Host> ipset = new HashSet<>();
305 for (Host h : hosts.values()) {
306 if (h.ipAddresses().contains(ip)) {
307 ipset.add(h);
308 }
309 }
310 return ipset;
311 }
312
313 @Override
314 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
315 return ImmutableSet.copyOf(locations.get(connectPoint));
316 }
317
318 @Override
319 public Set<Host> getConnectedHosts(DeviceId deviceId) {
320 Set<Host> hostset = new HashSet<>();
321 for (ConnectPoint p : locations.keySet()) {
322 if (p.deviceId().equals(deviceId)) {
323 hostset.addAll(locations.get(p));
324 }
325 }
326 return hostset;
327 }
328
329 @Override
330 public void updateAddressBindings(PortAddresses addresses) {
331 synchronized (portAddresses) {
332 PortAddresses existing = portAddresses.get(addresses.connectPoint());
333 if (existing == null) {
334 portAddresses.put(addresses.connectPoint(), addresses);
335 } else {
Pavlin Radoslavov76b0ae22014-10-27 15:33:19 -0700336 Set<InterfaceIpAddress> union =
337 Sets.union(existing.ipAddresses(),
338 addresses.ipAddresses()).immutableCopy();
Madan Jampaniab994042014-10-13 15:34:04 -0700339
340 MacAddress newMac = (addresses.mac() == null) ? existing.mac()
341 : addresses.mac();
342
343 PortAddresses newAddresses =
344 new PortAddresses(addresses.connectPoint(), union, newMac);
345
346 portAddresses.put(newAddresses.connectPoint(), newAddresses);
347 }
348 }
349 }
350
351 @Override
352 public void removeAddressBindings(PortAddresses addresses) {
353 synchronized (portAddresses) {
354 PortAddresses existing = portAddresses.get(addresses.connectPoint());
355 if (existing != null) {
Pavlin Radoslavov76b0ae22014-10-27 15:33:19 -0700356 Set<InterfaceIpAddress> difference =
357 Sets.difference(existing.ipAddresses(),
358 addresses.ipAddresses()).immutableCopy();
Madan Jampaniab994042014-10-13 15:34:04 -0700359
360 // If they removed the existing mac, set the new mac to null.
361 // Otherwise, keep the existing mac.
362 MacAddress newMac = existing.mac();
363 if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
364 newMac = null;
365 }
366
367 PortAddresses newAddresses =
368 new PortAddresses(addresses.connectPoint(), difference, newMac);
369
370 portAddresses.put(newAddresses.connectPoint(), newAddresses);
371 }
372 }
373 }
374
375 @Override
376 public void clearAddressBindings(ConnectPoint connectPoint) {
377 synchronized (portAddresses) {
378 portAddresses.remove(connectPoint);
379 }
380 }
381
382 @Override
383 public Set<PortAddresses> getAddressBindings() {
384 synchronized (portAddresses) {
385 return new HashSet<>(portAddresses.values());
386 }
387 }
388
389 @Override
390 public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
391 PortAddresses addresses;
392
393 synchronized (portAddresses) {
394 addresses = portAddresses.get(connectPoint);
395 }
396
397 if (addresses == null) {
398 addresses = new PortAddresses(connectPoint, null, null);
399 }
400
401 return addresses;
402 }
403
404 // Auxiliary extension to allow location to mutate.
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700405 private static final class StoredHost extends DefaultHost {
Madan Jampaniab994042014-10-13 15:34:04 -0700406 private Timestamped<HostLocation> location;
407
408 /**
409 * Creates an end-station host using the supplied information.
410 *
411 * @param providerId provider identity
412 * @param id host identifier
413 * @param mac host MAC address
414 * @param vlan host VLAN identifier
415 * @param location host location
416 * @param ips host IP addresses
417 * @param annotations optional key/value annotations
418 */
419 public StoredHost(ProviderId providerId, HostId id,
420 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700421 Set<IpAddress> ips, Annotations... annotations) {
Madan Jampaniab994042014-10-13 15:34:04 -0700422 super(providerId, id, mac, vlan, location.value(), ips, annotations);
423 this.location = location;
424 }
425
426 void setLocation(Timestamped<HostLocation> location) {
427 this.location = location;
428 }
429
430 @Override
431 public HostLocation location() {
432 return location.value();
433 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700434
435 public Timestamp timestamp() {
436 return location.timestamp();
437 }
Madan Jampaniab994042014-10-13 15:34:04 -0700438 }
Madan Jampani312a2982014-10-14 21:07:16 -0700439
440 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
441 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
442 }
443
444 private void notifyPeers(InternalHostEvent event) throws IOException {
445 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
446 }
447
448 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
449 ClusterMessage message = new ClusterMessage(
450 clusterService.getLocalNode().id(),
451 subject,
452 SERIALIZER.encode(event));
453 clusterCommunicator.broadcast(message);
454 }
Madan Jampani255618f2014-10-14 23:27:57 -0700455
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700456 private void unicastMessage(NodeId peer,
457 MessageSubject subject,
458 Object event) throws IOException {
459 ClusterMessage message = new ClusterMessage(
460 clusterService.getLocalNode().id(),
461 subject,
462 SERIALIZER.encode(event));
463 clusterCommunicator.unicast(message, peer);
464 }
465
Madan Jampani255618f2014-10-14 23:27:57 -0700466 private void notifyDelegateIfNotNull(HostEvent event) {
467 if (event != null) {
468 notifyDelegate(event);
469 }
470 }
471
472 private class InternalHostEventListener implements ClusterMessageHandler {
473 @Override
474 public void handle(ClusterMessage message) {
475
476 log.info("Received host update event from peer: {}", message.sender());
477 InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
478
479 ProviderId providerId = event.providerId();
480 HostId hostId = event.hostId();
481 HostDescription hostDescription = event.hostDescription();
482 Timestamp timestamp = event.timestamp();
483
484 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
485 }
486 }
487
488 private class InternalHostRemovedEventListener implements ClusterMessageHandler {
489 @Override
490 public void handle(ClusterMessage message) {
491
492 log.info("Received host removed event from peer: {}", message.sender());
493 InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
494
495 HostId hostId = event.hostId();
496 Timestamp timestamp = event.timestamp();
497
498 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
499 }
500 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700501
502 private final class SendAdvertisementTask implements Runnable {
503
504 @Override
505 public void run() {
506 if (Thread.currentThread().isInterrupted()) {
507 log.info("Interrupted, quitting");
508 return;
509 }
510
511 try {
512 final NodeId self = clusterService.getLocalNode().id();
513 Set<ControllerNode> nodes = clusterService.getNodes();
514
515 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
516 .transform(toNodeId())
517 .toList();
518
519 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
520 log.debug("No other peers in the cluster.");
521 return;
522 }
523
524 NodeId peer;
525 do {
526 int idx = RandomUtils.nextInt(0, nodeIds.size());
527 peer = nodeIds.get(idx);
528 } while (peer.equals(self));
529
530 HostAntiEntropyAdvertisement ad = createAdvertisement();
531
532 if (Thread.currentThread().isInterrupted()) {
533 log.info("Interrupted, quitting");
534 return;
535 }
536
537 try {
538 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
539 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700540 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700541 return;
542 }
543 } catch (Exception e) {
544 // catch all Exception to avoid Scheduled task being suppressed.
545 log.error("Exception thrown while sending advertisement", e);
546 }
547 }
548 }
549
550 private HostAntiEntropyAdvertisement createAdvertisement() {
551 final NodeId self = clusterService.getLocalNode().id();
552
553 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
554 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
555
556 for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
557
558 final HostId hostId = e.getKey();
559 final StoredHost hostInfo = e.getValue();
560 final ProviderId providerId = hostInfo.providerId();
561 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
562 }
563
564 for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
565 tombstones.put(e.getKey(), e.getValue().timestamp());
566 }
567
568 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
569 }
570
571 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
572
573 final NodeId sender = ad.sender();
574
575 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
576 // for each locally live Hosts...
577 final HostId hostId = host.getKey();
578 final StoredHost localHost = host.getValue();
579 final ProviderId providerId = localHost.providerId();
580 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
581 final Timestamp localLiveTimestamp = localHost.timestamp();
582
583 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
584 if (remoteTimestamp == null) {
585 remoteTimestamp = ad.tombstones().get(hostId);
586 }
587 if (remoteTimestamp == null ||
588 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
589
590 // local is more recent, push
591 // TODO: annotation is lost
592 final HostDescription desc = new DefaultHostDescription(
593 localHost.mac(),
594 localHost.vlan(),
595 localHost.location(),
596 localHost.ipAddresses());
597 try {
598 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
599 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
600 } catch (IOException e1) {
601 log.debug("Failed to send advertisement response", e1);
602 }
603 }
604
605 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
606 if (remoteDeadTimestamp != null &&
607 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
608 // sender has recent remove
609 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
610 }
611 }
612
613 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
614 // for each locally dead Hosts
615 final HostId hostId = dead.getKey();
616 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
617
618 // TODO: pick proper ProviderId, when supporting multi-provider
619 final ProviderId providerId = dead.getValue().value().providerId();
620 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
621
622 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
623 if (remoteLiveTimestamp != null &&
624 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
625 // sender has zombie, push
626 try {
627 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
628 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
629 } catch (IOException e1) {
630 log.debug("Failed to send advertisement response", e1);
631 }
632 }
633 }
634
635
636 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
637 // for each remote tombstone advertisement...
638 final HostId hostId = e.getKey();
639 final Timestamp adRemoveTimestamp = e.getValue();
640
641 final StoredHost storedHost = hosts.get(hostId);
642 if (storedHost == null) {
643 continue;
644 }
645 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
646 // sender has recent remove info, locally remove
647 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
648 }
649 }
650 }
651
652 private final class InternalHostAntiEntropyAdvertisementListener implements
653 ClusterMessageHandler {
654
655 @Override
656 public void handle(ClusterMessage message) {
657 log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
658 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
659 handleAntiEntropyAdvertisement(advertisement);
660 }
661 }
Madan Jampaniab994042014-10-13 15:34:04 -0700662}