blob: 3b233f721cc38bb640fce7a90371851951ef5e05 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniab994042014-10-13 15:34:04 -070016package org.onlab.onos.store.host.impl;
17
Jonathan Harta887ba82014-11-03 15:20:52 -080018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Thomas Vachuskacd2920c2014-11-19 14:49:55 -080020import static org.onlab.onos.net.DefaultAnnotations.merge;
Jonathan Harta887ba82014-11-03 15:20:52 -080021import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
22import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
23import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
24import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
25import static org.onlab.util.Tools.namedThreads;
26import static org.slf4j.LoggerFactory.getLogger;
27
28import java.io.IOException;
29import java.util.Collections;
30import java.util.HashMap;
31import java.util.HashSet;
32import java.util.Map;
33import java.util.Map.Entry;
34import java.util.Set;
35import java.util.concurrent.ConcurrentHashMap;
36import java.util.concurrent.ScheduledExecutorService;
37import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070038
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070039import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070040import org.apache.felix.scr.annotations.Activate;
41import org.apache.felix.scr.annotations.Component;
42import org.apache.felix.scr.annotations.Deactivate;
43import org.apache.felix.scr.annotations.Reference;
44import org.apache.felix.scr.annotations.ReferenceCardinality;
45import org.apache.felix.scr.annotations.Service;
46import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070047import org.onlab.onos.cluster.ControllerNode;
48import org.onlab.onos.cluster.NodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070049import org.onlab.onos.net.Annotations;
50import org.onlab.onos.net.ConnectPoint;
Thomas Vachuskacd2920c2014-11-19 14:49:55 -080051import org.onlab.onos.net.DefaultAnnotations;
Madan Jampaniab994042014-10-13 15:34:04 -070052import org.onlab.onos.net.DefaultHost;
53import org.onlab.onos.net.DeviceId;
54import org.onlab.onos.net.Host;
55import org.onlab.onos.net.HostId;
56import org.onlab.onos.net.HostLocation;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070057import org.onlab.onos.net.host.DefaultHostDescription;
Madan Jampaniab994042014-10-13 15:34:04 -070058import org.onlab.onos.net.host.HostClockService;
59import org.onlab.onos.net.host.HostDescription;
60import org.onlab.onos.net.host.HostEvent;
61import org.onlab.onos.net.host.HostStore;
62import org.onlab.onos.net.host.HostStoreDelegate;
63import org.onlab.onos.net.host.PortAddresses;
64import org.onlab.onos.net.provider.ProviderId;
65import org.onlab.onos.store.AbstractStore;
66import org.onlab.onos.store.Timestamp;
67import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070068import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070069import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070070import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070071import org.onlab.onos.store.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070072import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080073import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070074import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070075import org.onlab.packet.MacAddress;
76import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070077import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070078import org.slf4j.Logger;
79
Jonathan Harta887ba82014-11-03 15:20:52 -080080import com.google.common.collect.FluentIterable;
81import com.google.common.collect.HashMultimap;
82import com.google.common.collect.ImmutableList;
83import com.google.common.collect.ImmutableSet;
84import com.google.common.collect.Multimap;
85import com.google.common.collect.Multimaps;
86import com.google.common.collect.SetMultimap;
Madan Jampaniab994042014-10-13 15:34:04 -070087
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070088//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070089/**
90 * Manages inventory of end-station hosts in distributed data store
91 * that uses optimistic replication and gossip based techniques.
92 */
93@Component(immediate = true)
94@Service
95public class GossipHostStore
96 extends AbstractStore<HostEvent, HostStoreDelegate>
97 implements HostStore {
98
99 private final Logger log = getLogger(getClass());
100
101 // Host inventory
102 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
103
104 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
105
106 // Hosts tracked by their location
107 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
108
Jonathan Harta887ba82014-11-03 15:20:52 -0800109 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
110 Multimaps.synchronizedSetMultimap(
111 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 protected HostClockService hostClockService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
117 protected ClusterCommunicationService clusterCommunicator;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected ClusterService clusterService;
121
Madan Jampani312a2982014-10-14 21:07:16 -0700122 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
123 @Override
124 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700125 serializerPool = KryoNamespace.newBuilder()
Madan Jampani312a2982014-10-14 21:07:16 -0700126 .register(DistributedStoreSerializers.COMMON)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700127 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700128 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700129 .register(HostFragmentId.class)
130 .register(HostAntiEntropyAdvertisement.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700131 .build()
132 .populate(1);
133 }
134 };
135
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700136 private ScheduledExecutorService executor;
137
Madan Jampaniab994042014-10-13 15:34:04 -0700138 @Activate
139 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700140 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700141 GossipHostStoreMessageSubjects.HOST_UPDATED,
142 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700143 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700144 GossipHostStoreMessageSubjects.HOST_REMOVED,
145 new InternalHostRemovedEventListener());
146 clusterCommunicator.addSubscriber(
147 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
148 new InternalHostAntiEntropyAdvertisementListener());
149
150 executor =
151 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
152
153 // TODO: Make these configurable
154 long initialDelaySec = 5;
155 long periodSec = 5;
156 // start anti-entropy thread
157 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
158 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700159
Madan Jampaniab994042014-10-13 15:34:04 -0700160 log.info("Started");
161 }
162
163 @Deactivate
164 public void deactivate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700165 executor.shutdownNow();
166 try {
167 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
168 log.error("Timeout during executor shutdown");
169 }
170 } catch (InterruptedException e) {
171 log.error("Error during executor shutdown", e);
172 }
173
174 hosts.clear();
175 removedHosts.clear();
176 locations.clear();
177 portAddresses.clear();
178
Madan Jampaniab994042014-10-13 15:34:04 -0700179 log.info("Stopped");
180 }
181
182 @Override
183 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
184 HostDescription hostDescription) {
185 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700186 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
187 if (event != null) {
188 log.info("Notifying peers of a host topology event for providerId: "
189 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
190 try {
191 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
192 } catch (IOException e) {
193 log.error("Failed to notify peers of a host topology event for providerId: "
194 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
195 }
196 }
197 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700198 }
199
200 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
201 HostDescription hostDescription, Timestamp timestamp) {
202 StoredHost host = hosts.get(hostId);
203 if (host == null) {
204 return createHost(providerId, hostId, hostDescription, timestamp);
205 }
206 return updateHost(providerId, host, hostDescription, timestamp);
207 }
208
209 // creates a new host and sends HOST_ADDED
210 private HostEvent createHost(ProviderId providerId, HostId hostId,
211 HostDescription descr, Timestamp timestamp) {
212 synchronized (this) {
213 // If this host was previously removed, first ensure
214 // this new request is "newer"
215 if (removedHosts.containsKey(hostId)) {
216 if (removedHosts.get(hostId).isNewer(timestamp)) {
217 return null;
218 } else {
219 removedHosts.remove(hostId);
220 }
221 }
222 StoredHost newhost = new StoredHost(providerId, hostId,
223 descr.hwAddress(),
224 descr.vlan(),
225 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700226 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700227 hosts.put(hostId, newhost);
228 locations.put(descr.location(), newhost);
229 return new HostEvent(HOST_ADDED, newhost);
230 }
231 }
232
233 // checks for type of update to host, sends appropriate event
234 private HostEvent updateHost(ProviderId providerId, StoredHost host,
235 HostDescription descr, Timestamp timestamp) {
236 HostEvent event;
237 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
238 host.setLocation(new Timestamped<>(descr.location(), timestamp));
239 return new HostEvent(HOST_MOVED, host);
240 }
241
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800242 if (host.ipAddresses().containsAll(descr.ipAddress()) &&
243 descr.annotations().keys().isEmpty()) {
Madan Jampaniab994042014-10-13 15:34:04 -0700244 return null;
245 }
246
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700247 Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700248 addresses.addAll(descr.ipAddress());
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800249 Annotations annotations = merge((DefaultAnnotations) host.annotations(),
250 descr.annotations());
Madan Jampaniab994042014-10-13 15:34:04 -0700251 StoredHost updated = new StoredHost(providerId, host.id(),
252 host.mac(), host.vlan(),
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800253 host.location, addresses,
254 annotations);
Madan Jampaniab994042014-10-13 15:34:04 -0700255 event = new HostEvent(HOST_UPDATED, updated);
256 synchronized (this) {
257 hosts.put(host.id(), updated);
258 locations.remove(host.location(), host);
259 locations.put(updated.location(), updated);
260 }
261 return event;
262 }
263
264 @Override
265 public HostEvent removeHost(HostId hostId) {
266 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700267 HostEvent event = removeHostInternal(hostId, timestamp);
268 if (event != null) {
269 log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
270 try {
271 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
272 } catch (IOException e) {
273 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
274 }
275 }
276 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700277 }
278
279 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
280 synchronized (this) {
281 Host host = hosts.remove(hostId);
282 if (host != null) {
283 locations.remove((host.location()), host);
284 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
285 return new HostEvent(HOST_REMOVED, host);
286 }
287 return null;
288 }
289 }
290
291 @Override
292 public int getHostCount() {
293 return hosts.size();
294 }
295
296 @Override
297 public Iterable<Host> getHosts() {
298 return ImmutableSet.<Host>copyOf(hosts.values());
299 }
300
301 @Override
302 public Host getHost(HostId hostId) {
303 return hosts.get(hostId);
304 }
305
306 @Override
307 public Set<Host> getHosts(VlanId vlanId) {
308 Set<Host> vlanset = new HashSet<>();
309 for (Host h : hosts.values()) {
310 if (h.vlan().equals(vlanId)) {
311 vlanset.add(h);
312 }
313 }
314 return vlanset;
315 }
316
317 @Override
318 public Set<Host> getHosts(MacAddress mac) {
319 Set<Host> macset = new HashSet<>();
320 for (Host h : hosts.values()) {
321 if (h.mac().equals(mac)) {
322 macset.add(h);
323 }
324 }
325 return macset;
326 }
327
328 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700329 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700330 Set<Host> ipset = new HashSet<>();
331 for (Host h : hosts.values()) {
332 if (h.ipAddresses().contains(ip)) {
333 ipset.add(h);
334 }
335 }
336 return ipset;
337 }
338
339 @Override
340 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
341 return ImmutableSet.copyOf(locations.get(connectPoint));
342 }
343
344 @Override
345 public Set<Host> getConnectedHosts(DeviceId deviceId) {
346 Set<Host> hostset = new HashSet<>();
347 for (ConnectPoint p : locations.keySet()) {
348 if (p.deviceId().equals(deviceId)) {
349 hostset.addAll(locations.get(p));
350 }
351 }
352 return hostset;
353 }
354
355 @Override
356 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800357 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700358 }
359
360 @Override
361 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800362 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700363 }
364
365 @Override
366 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800367 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700368 }
369
370 @Override
371 public Set<PortAddresses> getAddressBindings() {
372 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800373 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700374 }
375 }
376
377 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800378 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700379 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800380 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700381
Jonathan Harta887ba82014-11-03 15:20:52 -0800382 if (addresses == null) {
383 return Collections.emptySet();
384 } else {
385 return ImmutableSet.copyOf(addresses);
386 }
Madan Jampaniab994042014-10-13 15:34:04 -0700387 }
Madan Jampaniab994042014-10-13 15:34:04 -0700388 }
389
390 // Auxiliary extension to allow location to mutate.
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700391 private static final class StoredHost extends DefaultHost {
Madan Jampaniab994042014-10-13 15:34:04 -0700392 private Timestamped<HostLocation> location;
393
394 /**
395 * Creates an end-station host using the supplied information.
396 *
397 * @param providerId provider identity
398 * @param id host identifier
399 * @param mac host MAC address
400 * @param vlan host VLAN identifier
401 * @param location host location
402 * @param ips host IP addresses
403 * @param annotations optional key/value annotations
404 */
405 public StoredHost(ProviderId providerId, HostId id,
406 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700407 Set<IpAddress> ips, Annotations... annotations) {
Madan Jampaniab994042014-10-13 15:34:04 -0700408 super(providerId, id, mac, vlan, location.value(), ips, annotations);
409 this.location = location;
410 }
411
412 void setLocation(Timestamped<HostLocation> location) {
413 this.location = location;
414 }
415
416 @Override
417 public HostLocation location() {
418 return location.value();
419 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700420
421 public Timestamp timestamp() {
422 return location.timestamp();
423 }
Madan Jampaniab994042014-10-13 15:34:04 -0700424 }
Madan Jampani312a2982014-10-14 21:07:16 -0700425
426 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
427 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
428 }
429
430 private void notifyPeers(InternalHostEvent event) throws IOException {
431 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
432 }
433
434 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
435 ClusterMessage message = new ClusterMessage(
436 clusterService.getLocalNode().id(),
437 subject,
438 SERIALIZER.encode(event));
439 clusterCommunicator.broadcast(message);
440 }
Madan Jampani255618f2014-10-14 23:27:57 -0700441
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700442 private void unicastMessage(NodeId peer,
443 MessageSubject subject,
444 Object event) throws IOException {
445 ClusterMessage message = new ClusterMessage(
446 clusterService.getLocalNode().id(),
447 subject,
448 SERIALIZER.encode(event));
449 clusterCommunicator.unicast(message, peer);
450 }
451
Madan Jampani255618f2014-10-14 23:27:57 -0700452 private void notifyDelegateIfNotNull(HostEvent event) {
453 if (event != null) {
454 notifyDelegate(event);
455 }
456 }
457
458 private class InternalHostEventListener implements ClusterMessageHandler {
459 @Override
460 public void handle(ClusterMessage message) {
461
462 log.info("Received host update event from peer: {}", message.sender());
463 InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
464
465 ProviderId providerId = event.providerId();
466 HostId hostId = event.hostId();
467 HostDescription hostDescription = event.hostDescription();
468 Timestamp timestamp = event.timestamp();
469
470 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
471 }
472 }
473
474 private class InternalHostRemovedEventListener implements ClusterMessageHandler {
475 @Override
476 public void handle(ClusterMessage message) {
477
478 log.info("Received host removed event from peer: {}", message.sender());
479 InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
480
481 HostId hostId = event.hostId();
482 Timestamp timestamp = event.timestamp();
483
484 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
485 }
486 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700487
488 private final class SendAdvertisementTask implements Runnable {
489
490 @Override
491 public void run() {
492 if (Thread.currentThread().isInterrupted()) {
493 log.info("Interrupted, quitting");
494 return;
495 }
496
497 try {
498 final NodeId self = clusterService.getLocalNode().id();
499 Set<ControllerNode> nodes = clusterService.getNodes();
500
501 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
502 .transform(toNodeId())
503 .toList();
504
505 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
506 log.debug("No other peers in the cluster.");
507 return;
508 }
509
510 NodeId peer;
511 do {
512 int idx = RandomUtils.nextInt(0, nodeIds.size());
513 peer = nodeIds.get(idx);
514 } while (peer.equals(self));
515
516 HostAntiEntropyAdvertisement ad = createAdvertisement();
517
518 if (Thread.currentThread().isInterrupted()) {
519 log.info("Interrupted, quitting");
520 return;
521 }
522
523 try {
524 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
525 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700526 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700527 return;
528 }
529 } catch (Exception e) {
530 // catch all Exception to avoid Scheduled task being suppressed.
531 log.error("Exception thrown while sending advertisement", e);
532 }
533 }
534 }
535
536 private HostAntiEntropyAdvertisement createAdvertisement() {
537 final NodeId self = clusterService.getLocalNode().id();
538
539 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
540 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
541
542 for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
543
544 final HostId hostId = e.getKey();
545 final StoredHost hostInfo = e.getValue();
546 final ProviderId providerId = hostInfo.providerId();
547 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
548 }
549
550 for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
551 tombstones.put(e.getKey(), e.getValue().timestamp());
552 }
553
554 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
555 }
556
557 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
558
559 final NodeId sender = ad.sender();
560
561 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
562 // for each locally live Hosts...
563 final HostId hostId = host.getKey();
564 final StoredHost localHost = host.getValue();
565 final ProviderId providerId = localHost.providerId();
566 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
567 final Timestamp localLiveTimestamp = localHost.timestamp();
568
569 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
570 if (remoteTimestamp == null) {
571 remoteTimestamp = ad.tombstones().get(hostId);
572 }
573 if (remoteTimestamp == null ||
574 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
575
576 // local is more recent, push
577 // TODO: annotation is lost
578 final HostDescription desc = new DefaultHostDescription(
579 localHost.mac(),
580 localHost.vlan(),
581 localHost.location(),
582 localHost.ipAddresses());
583 try {
584 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
585 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
586 } catch (IOException e1) {
587 log.debug("Failed to send advertisement response", e1);
588 }
589 }
590
591 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
592 if (remoteDeadTimestamp != null &&
593 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
594 // sender has recent remove
595 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
596 }
597 }
598
599 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
600 // for each locally dead Hosts
601 final HostId hostId = dead.getKey();
602 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
603
604 // TODO: pick proper ProviderId, when supporting multi-provider
605 final ProviderId providerId = dead.getValue().value().providerId();
606 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
607
608 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
609 if (remoteLiveTimestamp != null &&
610 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
611 // sender has zombie, push
612 try {
613 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
614 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
615 } catch (IOException e1) {
616 log.debug("Failed to send advertisement response", e1);
617 }
618 }
619 }
620
621
622 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
623 // for each remote tombstone advertisement...
624 final HostId hostId = e.getKey();
625 final Timestamp adRemoveTimestamp = e.getValue();
626
627 final StoredHost storedHost = hosts.get(hostId);
628 if (storedHost == null) {
629 continue;
630 }
631 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
632 // sender has recent remove info, locally remove
633 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
634 }
635 }
636 }
637
638 private final class InternalHostAntiEntropyAdvertisementListener implements
639 ClusterMessageHandler {
640
641 @Override
642 public void handle(ClusterMessage message) {
643 log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
644 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
645 handleAntiEntropyAdvertisement(advertisement);
646 }
647 }
Madan Jampaniab994042014-10-13 15:34:04 -0700648}