blob: 5353a912cca27b8aaaaac6de99e412f7909e30c5 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniab994042014-10-13 15:34:04 -070016package org.onlab.onos.store.host.impl;
17
Jonathan Harta887ba82014-11-03 15:20:52 -080018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
20import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
21import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
22import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
23import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
24import static org.onlab.util.Tools.namedThreads;
25import static org.slf4j.LoggerFactory.getLogger;
26
27import java.io.IOException;
28import java.util.Collections;
29import java.util.HashMap;
30import java.util.HashSet;
31import java.util.Map;
32import java.util.Map.Entry;
33import java.util.Set;
34import java.util.concurrent.ConcurrentHashMap;
35import java.util.concurrent.ScheduledExecutorService;
36import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070037
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070038import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070039import org.apache.felix.scr.annotations.Activate;
40import org.apache.felix.scr.annotations.Component;
41import org.apache.felix.scr.annotations.Deactivate;
42import org.apache.felix.scr.annotations.Reference;
43import org.apache.felix.scr.annotations.ReferenceCardinality;
44import org.apache.felix.scr.annotations.Service;
45import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070046import org.onlab.onos.cluster.ControllerNode;
47import org.onlab.onos.cluster.NodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070048import org.onlab.onos.net.Annotations;
49import org.onlab.onos.net.ConnectPoint;
50import org.onlab.onos.net.DefaultHost;
51import org.onlab.onos.net.DeviceId;
52import org.onlab.onos.net.Host;
53import org.onlab.onos.net.HostId;
54import org.onlab.onos.net.HostLocation;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070055import org.onlab.onos.net.host.DefaultHostDescription;
Madan Jampaniab994042014-10-13 15:34:04 -070056import org.onlab.onos.net.host.HostClockService;
57import org.onlab.onos.net.host.HostDescription;
58import org.onlab.onos.net.host.HostEvent;
59import org.onlab.onos.net.host.HostStore;
60import org.onlab.onos.net.host.HostStoreDelegate;
61import org.onlab.onos.net.host.PortAddresses;
62import org.onlab.onos.net.provider.ProviderId;
63import org.onlab.onos.store.AbstractStore;
64import org.onlab.onos.store.Timestamp;
65import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070066import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070067import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070068import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070069import org.onlab.onos.store.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070070import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080071import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070072import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070073import org.onlab.packet.MacAddress;
74import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070075import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070076import org.slf4j.Logger;
77
Jonathan Harta887ba82014-11-03 15:20:52 -080078import com.google.common.collect.FluentIterable;
79import com.google.common.collect.HashMultimap;
80import com.google.common.collect.ImmutableList;
81import com.google.common.collect.ImmutableSet;
82import com.google.common.collect.Multimap;
83import com.google.common.collect.Multimaps;
84import com.google.common.collect.SetMultimap;
Madan Jampaniab994042014-10-13 15:34:04 -070085
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070086//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070087/**
88 * Manages inventory of end-station hosts in distributed data store
89 * that uses optimistic replication and gossip based techniques.
90 */
91@Component(immediate = true)
92@Service
93public class GossipHostStore
94 extends AbstractStore<HostEvent, HostStoreDelegate>
95 implements HostStore {
96
97 private final Logger log = getLogger(getClass());
98
99 // Host inventory
100 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
101
102 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
103
104 // Hosts tracked by their location
105 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
106
Jonathan Harta887ba82014-11-03 15:20:52 -0800107 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
108 Multimaps.synchronizedSetMultimap(
109 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
112 protected HostClockService hostClockService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
115 protected ClusterCommunicationService clusterCommunicator;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
118 protected ClusterService clusterService;
119
Madan Jampani312a2982014-10-14 21:07:16 -0700120 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
121 @Override
122 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700123 serializerPool = KryoNamespace.newBuilder()
Madan Jampani312a2982014-10-14 21:07:16 -0700124 .register(DistributedStoreSerializers.COMMON)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700125 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700126 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700127 .register(HostFragmentId.class)
128 .register(HostAntiEntropyAdvertisement.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700129 .build()
130 .populate(1);
131 }
132 };
133
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700134 private ScheduledExecutorService executor;
135
Madan Jampaniab994042014-10-13 15:34:04 -0700136 @Activate
137 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700138 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700139 GossipHostStoreMessageSubjects.HOST_UPDATED,
140 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700141 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700142 GossipHostStoreMessageSubjects.HOST_REMOVED,
143 new InternalHostRemovedEventListener());
144 clusterCommunicator.addSubscriber(
145 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
146 new InternalHostAntiEntropyAdvertisementListener());
147
148 executor =
149 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
150
151 // TODO: Make these configurable
152 long initialDelaySec = 5;
153 long periodSec = 5;
154 // start anti-entropy thread
155 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
156 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700157
Madan Jampaniab994042014-10-13 15:34:04 -0700158 log.info("Started");
159 }
160
161 @Deactivate
162 public void deactivate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700163 executor.shutdownNow();
164 try {
165 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
166 log.error("Timeout during executor shutdown");
167 }
168 } catch (InterruptedException e) {
169 log.error("Error during executor shutdown", e);
170 }
171
172 hosts.clear();
173 removedHosts.clear();
174 locations.clear();
175 portAddresses.clear();
176
Madan Jampaniab994042014-10-13 15:34:04 -0700177 log.info("Stopped");
178 }
179
180 @Override
181 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
182 HostDescription hostDescription) {
183 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700184 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
185 if (event != null) {
186 log.info("Notifying peers of a host topology event for providerId: "
187 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
188 try {
189 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
190 } catch (IOException e) {
191 log.error("Failed to notify peers of a host topology event for providerId: "
192 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
193 }
194 }
195 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700196 }
197
198 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
199 HostDescription hostDescription, Timestamp timestamp) {
200 StoredHost host = hosts.get(hostId);
201 if (host == null) {
202 return createHost(providerId, hostId, hostDescription, timestamp);
203 }
204 return updateHost(providerId, host, hostDescription, timestamp);
205 }
206
207 // creates a new host and sends HOST_ADDED
208 private HostEvent createHost(ProviderId providerId, HostId hostId,
209 HostDescription descr, Timestamp timestamp) {
210 synchronized (this) {
211 // If this host was previously removed, first ensure
212 // this new request is "newer"
213 if (removedHosts.containsKey(hostId)) {
214 if (removedHosts.get(hostId).isNewer(timestamp)) {
215 return null;
216 } else {
217 removedHosts.remove(hostId);
218 }
219 }
220 StoredHost newhost = new StoredHost(providerId, hostId,
221 descr.hwAddress(),
222 descr.vlan(),
223 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700224 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700225 hosts.put(hostId, newhost);
226 locations.put(descr.location(), newhost);
227 return new HostEvent(HOST_ADDED, newhost);
228 }
229 }
230
231 // checks for type of update to host, sends appropriate event
232 private HostEvent updateHost(ProviderId providerId, StoredHost host,
233 HostDescription descr, Timestamp timestamp) {
234 HostEvent event;
235 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
236 host.setLocation(new Timestamped<>(descr.location(), timestamp));
237 return new HostEvent(HOST_MOVED, host);
238 }
239
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700240 if (host.ipAddresses().containsAll(descr.ipAddress())) {
Madan Jampaniab994042014-10-13 15:34:04 -0700241 return null;
242 }
243
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700244 Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700245 addresses.addAll(descr.ipAddress());
Madan Jampaniab994042014-10-13 15:34:04 -0700246 StoredHost updated = new StoredHost(providerId, host.id(),
247 host.mac(), host.vlan(),
248 host.location, addresses);
249 event = new HostEvent(HOST_UPDATED, updated);
250 synchronized (this) {
251 hosts.put(host.id(), updated);
252 locations.remove(host.location(), host);
253 locations.put(updated.location(), updated);
254 }
255 return event;
256 }
257
258 @Override
259 public HostEvent removeHost(HostId hostId) {
260 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700261 HostEvent event = removeHostInternal(hostId, timestamp);
262 if (event != null) {
263 log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
264 try {
265 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
266 } catch (IOException e) {
267 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
268 }
269 }
270 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700271 }
272
273 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
274 synchronized (this) {
275 Host host = hosts.remove(hostId);
276 if (host != null) {
277 locations.remove((host.location()), host);
278 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
279 return new HostEvent(HOST_REMOVED, host);
280 }
281 return null;
282 }
283 }
284
285 @Override
286 public int getHostCount() {
287 return hosts.size();
288 }
289
290 @Override
291 public Iterable<Host> getHosts() {
292 return ImmutableSet.<Host>copyOf(hosts.values());
293 }
294
295 @Override
296 public Host getHost(HostId hostId) {
297 return hosts.get(hostId);
298 }
299
300 @Override
301 public Set<Host> getHosts(VlanId vlanId) {
302 Set<Host> vlanset = new HashSet<>();
303 for (Host h : hosts.values()) {
304 if (h.vlan().equals(vlanId)) {
305 vlanset.add(h);
306 }
307 }
308 return vlanset;
309 }
310
311 @Override
312 public Set<Host> getHosts(MacAddress mac) {
313 Set<Host> macset = new HashSet<>();
314 for (Host h : hosts.values()) {
315 if (h.mac().equals(mac)) {
316 macset.add(h);
317 }
318 }
319 return macset;
320 }
321
322 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700323 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700324 Set<Host> ipset = new HashSet<>();
325 for (Host h : hosts.values()) {
326 if (h.ipAddresses().contains(ip)) {
327 ipset.add(h);
328 }
329 }
330 return ipset;
331 }
332
333 @Override
334 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
335 return ImmutableSet.copyOf(locations.get(connectPoint));
336 }
337
338 @Override
339 public Set<Host> getConnectedHosts(DeviceId deviceId) {
340 Set<Host> hostset = new HashSet<>();
341 for (ConnectPoint p : locations.keySet()) {
342 if (p.deviceId().equals(deviceId)) {
343 hostset.addAll(locations.get(p));
344 }
345 }
346 return hostset;
347 }
348
349 @Override
350 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800351 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700352 }
353
354 @Override
355 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800356 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700357 }
358
359 @Override
360 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800361 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700362 }
363
364 @Override
365 public Set<PortAddresses> getAddressBindings() {
366 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800367 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700368 }
369 }
370
371 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800372 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700373 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800374 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700375
Jonathan Harta887ba82014-11-03 15:20:52 -0800376 if (addresses == null) {
377 return Collections.emptySet();
378 } else {
379 return ImmutableSet.copyOf(addresses);
380 }
Madan Jampaniab994042014-10-13 15:34:04 -0700381 }
Madan Jampaniab994042014-10-13 15:34:04 -0700382 }
383
384 // Auxiliary extension to allow location to mutate.
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700385 private static final class StoredHost extends DefaultHost {
Madan Jampaniab994042014-10-13 15:34:04 -0700386 private Timestamped<HostLocation> location;
387
388 /**
389 * Creates an end-station host using the supplied information.
390 *
391 * @param providerId provider identity
392 * @param id host identifier
393 * @param mac host MAC address
394 * @param vlan host VLAN identifier
395 * @param location host location
396 * @param ips host IP addresses
397 * @param annotations optional key/value annotations
398 */
399 public StoredHost(ProviderId providerId, HostId id,
400 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700401 Set<IpAddress> ips, Annotations... annotations) {
Madan Jampaniab994042014-10-13 15:34:04 -0700402 super(providerId, id, mac, vlan, location.value(), ips, annotations);
403 this.location = location;
404 }
405
406 void setLocation(Timestamped<HostLocation> location) {
407 this.location = location;
408 }
409
410 @Override
411 public HostLocation location() {
412 return location.value();
413 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700414
415 public Timestamp timestamp() {
416 return location.timestamp();
417 }
Madan Jampaniab994042014-10-13 15:34:04 -0700418 }
Madan Jampani312a2982014-10-14 21:07:16 -0700419
420 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
421 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
422 }
423
424 private void notifyPeers(InternalHostEvent event) throws IOException {
425 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
426 }
427
428 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
429 ClusterMessage message = new ClusterMessage(
430 clusterService.getLocalNode().id(),
431 subject,
432 SERIALIZER.encode(event));
433 clusterCommunicator.broadcast(message);
434 }
Madan Jampani255618f2014-10-14 23:27:57 -0700435
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700436 private void unicastMessage(NodeId peer,
437 MessageSubject subject,
438 Object event) throws IOException {
439 ClusterMessage message = new ClusterMessage(
440 clusterService.getLocalNode().id(),
441 subject,
442 SERIALIZER.encode(event));
443 clusterCommunicator.unicast(message, peer);
444 }
445
Madan Jampani255618f2014-10-14 23:27:57 -0700446 private void notifyDelegateIfNotNull(HostEvent event) {
447 if (event != null) {
448 notifyDelegate(event);
449 }
450 }
451
452 private class InternalHostEventListener implements ClusterMessageHandler {
453 @Override
454 public void handle(ClusterMessage message) {
455
456 log.info("Received host update event from peer: {}", message.sender());
457 InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
458
459 ProviderId providerId = event.providerId();
460 HostId hostId = event.hostId();
461 HostDescription hostDescription = event.hostDescription();
462 Timestamp timestamp = event.timestamp();
463
464 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
465 }
466 }
467
468 private class InternalHostRemovedEventListener implements ClusterMessageHandler {
469 @Override
470 public void handle(ClusterMessage message) {
471
472 log.info("Received host removed event from peer: {}", message.sender());
473 InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
474
475 HostId hostId = event.hostId();
476 Timestamp timestamp = event.timestamp();
477
478 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
479 }
480 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700481
482 private final class SendAdvertisementTask implements Runnable {
483
484 @Override
485 public void run() {
486 if (Thread.currentThread().isInterrupted()) {
487 log.info("Interrupted, quitting");
488 return;
489 }
490
491 try {
492 final NodeId self = clusterService.getLocalNode().id();
493 Set<ControllerNode> nodes = clusterService.getNodes();
494
495 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
496 .transform(toNodeId())
497 .toList();
498
499 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
500 log.debug("No other peers in the cluster.");
501 return;
502 }
503
504 NodeId peer;
505 do {
506 int idx = RandomUtils.nextInt(0, nodeIds.size());
507 peer = nodeIds.get(idx);
508 } while (peer.equals(self));
509
510 HostAntiEntropyAdvertisement ad = createAdvertisement();
511
512 if (Thread.currentThread().isInterrupted()) {
513 log.info("Interrupted, quitting");
514 return;
515 }
516
517 try {
518 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
519 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700520 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700521 return;
522 }
523 } catch (Exception e) {
524 // catch all Exception to avoid Scheduled task being suppressed.
525 log.error("Exception thrown while sending advertisement", e);
526 }
527 }
528 }
529
530 private HostAntiEntropyAdvertisement createAdvertisement() {
531 final NodeId self = clusterService.getLocalNode().id();
532
533 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
534 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
535
536 for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
537
538 final HostId hostId = e.getKey();
539 final StoredHost hostInfo = e.getValue();
540 final ProviderId providerId = hostInfo.providerId();
541 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
542 }
543
544 for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
545 tombstones.put(e.getKey(), e.getValue().timestamp());
546 }
547
548 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
549 }
550
551 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
552
553 final NodeId sender = ad.sender();
554
555 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
556 // for each locally live Hosts...
557 final HostId hostId = host.getKey();
558 final StoredHost localHost = host.getValue();
559 final ProviderId providerId = localHost.providerId();
560 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
561 final Timestamp localLiveTimestamp = localHost.timestamp();
562
563 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
564 if (remoteTimestamp == null) {
565 remoteTimestamp = ad.tombstones().get(hostId);
566 }
567 if (remoteTimestamp == null ||
568 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
569
570 // local is more recent, push
571 // TODO: annotation is lost
572 final HostDescription desc = new DefaultHostDescription(
573 localHost.mac(),
574 localHost.vlan(),
575 localHost.location(),
576 localHost.ipAddresses());
577 try {
578 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
579 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
580 } catch (IOException e1) {
581 log.debug("Failed to send advertisement response", e1);
582 }
583 }
584
585 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
586 if (remoteDeadTimestamp != null &&
587 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
588 // sender has recent remove
589 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
590 }
591 }
592
593 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
594 // for each locally dead Hosts
595 final HostId hostId = dead.getKey();
596 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
597
598 // TODO: pick proper ProviderId, when supporting multi-provider
599 final ProviderId providerId = dead.getValue().value().providerId();
600 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
601
602 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
603 if (remoteLiveTimestamp != null &&
604 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
605 // sender has zombie, push
606 try {
607 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
608 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
609 } catch (IOException e1) {
610 log.debug("Failed to send advertisement response", e1);
611 }
612 }
613 }
614
615
616 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
617 // for each remote tombstone advertisement...
618 final HostId hostId = e.getKey();
619 final Timestamp adRemoveTimestamp = e.getValue();
620
621 final StoredHost storedHost = hosts.get(hostId);
622 if (storedHost == null) {
623 continue;
624 }
625 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
626 // sender has recent remove info, locally remove
627 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
628 }
629 }
630 }
631
632 private final class InternalHostAntiEntropyAdvertisementListener implements
633 ClusterMessageHandler {
634
635 @Override
636 public void handle(ClusterMessage message) {
637 log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
638 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
639 handleAntiEntropyAdvertisement(advertisement);
640 }
641 }
Madan Jampaniab994042014-10-13 15:34:04 -0700642}