blob: 8029e27c440ec378d8790b30e0b6041b3145c46e [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniab994042014-10-13 15:34:04 -070016package org.onlab.onos.store.host.impl;
17
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070018import com.google.common.collect.FluentIterable;
Madan Jampaniab994042014-10-13 15:34:04 -070019import com.google.common.collect.HashMultimap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070020import com.google.common.collect.ImmutableList;
Madan Jampaniab994042014-10-13 15:34:04 -070021import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Sets;
24
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070025import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070033import org.onlab.onos.cluster.ControllerNode;
34import org.onlab.onos.cluster.NodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070035import org.onlab.onos.net.Annotations;
36import org.onlab.onos.net.ConnectPoint;
37import org.onlab.onos.net.DefaultHost;
38import org.onlab.onos.net.DeviceId;
39import org.onlab.onos.net.Host;
40import org.onlab.onos.net.HostId;
41import org.onlab.onos.net.HostLocation;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070042import org.onlab.onos.net.host.DefaultHostDescription;
Madan Jampaniab994042014-10-13 15:34:04 -070043import org.onlab.onos.net.host.HostClockService;
44import org.onlab.onos.net.host.HostDescription;
45import org.onlab.onos.net.host.HostEvent;
46import org.onlab.onos.net.host.HostStore;
47import org.onlab.onos.net.host.HostStoreDelegate;
Pavlin Radoslavov76b0ae22014-10-27 15:33:19 -070048import org.onlab.onos.net.host.InterfaceIpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070049import org.onlab.onos.net.host.PortAddresses;
50import org.onlab.onos.net.provider.ProviderId;
51import org.onlab.onos.store.AbstractStore;
52import org.onlab.onos.store.Timestamp;
53import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070054import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070055import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070056import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070057import org.onlab.onos.store.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070058import org.onlab.onos.store.serializers.DistributedStoreSerializers;
59import org.onlab.onos.store.serializers.KryoSerializer;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070060import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070061import org.onlab.packet.MacAddress;
62import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070063import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070064import org.slf4j.Logger;
65
Madan Jampani312a2982014-10-14 21:07:16 -070066import java.io.IOException;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070067import java.util.HashMap;
Madan Jampaniab994042014-10-13 15:34:04 -070068import java.util.HashSet;
69import java.util.Map;
70import java.util.Set;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070071import java.util.Map.Entry;
Madan Jampaniab994042014-10-13 15:34:04 -070072import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070073import java.util.concurrent.ScheduledExecutorService;
74import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070075
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070076import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
77import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070078import static org.onlab.onos.net.host.HostEvent.Type.*;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070079import static org.onlab.util.Tools.namedThreads;
Madan Jampaniab994042014-10-13 15:34:04 -070080import static org.slf4j.LoggerFactory.getLogger;
81
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070082//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070083/**
84 * Manages inventory of end-station hosts in distributed data store
85 * that uses optimistic replication and gossip based techniques.
86 */
87@Component(immediate = true)
88@Service
89public class GossipHostStore
90 extends AbstractStore<HostEvent, HostStoreDelegate>
91 implements HostStore {
92
93 private final Logger log = getLogger(getClass());
94
95 // Host inventory
96 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
97
98 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
99
100 // Hosts tracked by their location
101 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
102
103 private final Map<ConnectPoint, PortAddresses> portAddresses =
104 new ConcurrentHashMap<>();
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected HostClockService hostClockService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110 protected ClusterCommunicationService clusterCommunicator;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 protected ClusterService clusterService;
114
Madan Jampani312a2982014-10-14 21:07:16 -0700115 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
116 @Override
117 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700118 serializerPool = KryoNamespace.newBuilder()
Madan Jampani312a2982014-10-14 21:07:16 -0700119 .register(DistributedStoreSerializers.COMMON)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700120 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700121 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700122 .register(HostFragmentId.class)
123 .register(HostAntiEntropyAdvertisement.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700124 .build()
125 .populate(1);
126 }
127 };
128
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700129 private ScheduledExecutorService executor;
130
Madan Jampaniab994042014-10-13 15:34:04 -0700131 @Activate
132 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700133 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700134 GossipHostStoreMessageSubjects.HOST_UPDATED,
135 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700136 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700137 GossipHostStoreMessageSubjects.HOST_REMOVED,
138 new InternalHostRemovedEventListener());
139 clusterCommunicator.addSubscriber(
140 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
141 new InternalHostAntiEntropyAdvertisementListener());
142
143 executor =
144 newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
145
146 // TODO: Make these configurable
147 long initialDelaySec = 5;
148 long periodSec = 5;
149 // start anti-entropy thread
150 executor.scheduleAtFixedRate(new SendAdvertisementTask(),
151 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700152
Madan Jampaniab994042014-10-13 15:34:04 -0700153 log.info("Started");
154 }
155
156 @Deactivate
157 public void deactivate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700158 executor.shutdownNow();
159 try {
160 if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
161 log.error("Timeout during executor shutdown");
162 }
163 } catch (InterruptedException e) {
164 log.error("Error during executor shutdown", e);
165 }
166
167 hosts.clear();
168 removedHosts.clear();
169 locations.clear();
170 portAddresses.clear();
171
Madan Jampaniab994042014-10-13 15:34:04 -0700172 log.info("Stopped");
173 }
174
175 @Override
176 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
177 HostDescription hostDescription) {
178 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700179 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
180 if (event != null) {
181 log.info("Notifying peers of a host topology event for providerId: "
182 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
183 try {
184 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
185 } catch (IOException e) {
186 log.error("Failed to notify peers of a host topology event for providerId: "
187 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
188 }
189 }
190 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700191 }
192
193 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
194 HostDescription hostDescription, Timestamp timestamp) {
195 StoredHost host = hosts.get(hostId);
196 if (host == null) {
197 return createHost(providerId, hostId, hostDescription, timestamp);
198 }
199 return updateHost(providerId, host, hostDescription, timestamp);
200 }
201
202 // creates a new host and sends HOST_ADDED
203 private HostEvent createHost(ProviderId providerId, HostId hostId,
204 HostDescription descr, Timestamp timestamp) {
205 synchronized (this) {
206 // If this host was previously removed, first ensure
207 // this new request is "newer"
208 if (removedHosts.containsKey(hostId)) {
209 if (removedHosts.get(hostId).isNewer(timestamp)) {
210 return null;
211 } else {
212 removedHosts.remove(hostId);
213 }
214 }
215 StoredHost newhost = new StoredHost(providerId, hostId,
216 descr.hwAddress(),
217 descr.vlan(),
218 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700219 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700220 hosts.put(hostId, newhost);
221 locations.put(descr.location(), newhost);
222 return new HostEvent(HOST_ADDED, newhost);
223 }
224 }
225
226 // checks for type of update to host, sends appropriate event
227 private HostEvent updateHost(ProviderId providerId, StoredHost host,
228 HostDescription descr, Timestamp timestamp) {
229 HostEvent event;
230 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
231 host.setLocation(new Timestamped<>(descr.location(), timestamp));
232 return new HostEvent(HOST_MOVED, host);
233 }
234
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700235 if (host.ipAddresses().containsAll(descr.ipAddress())) {
Madan Jampaniab994042014-10-13 15:34:04 -0700236 return null;
237 }
238
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700239 Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700240 addresses.addAll(descr.ipAddress());
Madan Jampaniab994042014-10-13 15:34:04 -0700241 StoredHost updated = new StoredHost(providerId, host.id(),
242 host.mac(), host.vlan(),
243 host.location, addresses);
244 event = new HostEvent(HOST_UPDATED, updated);
245 synchronized (this) {
246 hosts.put(host.id(), updated);
247 locations.remove(host.location(), host);
248 locations.put(updated.location(), updated);
249 }
250 return event;
251 }
252
253 @Override
254 public HostEvent removeHost(HostId hostId) {
255 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700256 HostEvent event = removeHostInternal(hostId, timestamp);
257 if (event != null) {
258 log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
259 try {
260 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
261 } catch (IOException e) {
262 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
263 }
264 }
265 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700266 }
267
268 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
269 synchronized (this) {
270 Host host = hosts.remove(hostId);
271 if (host != null) {
272 locations.remove((host.location()), host);
273 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
274 return new HostEvent(HOST_REMOVED, host);
275 }
276 return null;
277 }
278 }
279
280 @Override
281 public int getHostCount() {
282 return hosts.size();
283 }
284
285 @Override
286 public Iterable<Host> getHosts() {
287 return ImmutableSet.<Host>copyOf(hosts.values());
288 }
289
290 @Override
291 public Host getHost(HostId hostId) {
292 return hosts.get(hostId);
293 }
294
295 @Override
296 public Set<Host> getHosts(VlanId vlanId) {
297 Set<Host> vlanset = new HashSet<>();
298 for (Host h : hosts.values()) {
299 if (h.vlan().equals(vlanId)) {
300 vlanset.add(h);
301 }
302 }
303 return vlanset;
304 }
305
306 @Override
307 public Set<Host> getHosts(MacAddress mac) {
308 Set<Host> macset = new HashSet<>();
309 for (Host h : hosts.values()) {
310 if (h.mac().equals(mac)) {
311 macset.add(h);
312 }
313 }
314 return macset;
315 }
316
317 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700318 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700319 Set<Host> ipset = new HashSet<>();
320 for (Host h : hosts.values()) {
321 if (h.ipAddresses().contains(ip)) {
322 ipset.add(h);
323 }
324 }
325 return ipset;
326 }
327
328 @Override
329 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
330 return ImmutableSet.copyOf(locations.get(connectPoint));
331 }
332
333 @Override
334 public Set<Host> getConnectedHosts(DeviceId deviceId) {
335 Set<Host> hostset = new HashSet<>();
336 for (ConnectPoint p : locations.keySet()) {
337 if (p.deviceId().equals(deviceId)) {
338 hostset.addAll(locations.get(p));
339 }
340 }
341 return hostset;
342 }
343
344 @Override
345 public void updateAddressBindings(PortAddresses addresses) {
346 synchronized (portAddresses) {
347 PortAddresses existing = portAddresses.get(addresses.connectPoint());
348 if (existing == null) {
349 portAddresses.put(addresses.connectPoint(), addresses);
350 } else {
Pavlin Radoslavov76b0ae22014-10-27 15:33:19 -0700351 Set<InterfaceIpAddress> union =
352 Sets.union(existing.ipAddresses(),
353 addresses.ipAddresses()).immutableCopy();
Madan Jampaniab994042014-10-13 15:34:04 -0700354
355 MacAddress newMac = (addresses.mac() == null) ? existing.mac()
356 : addresses.mac();
357
358 PortAddresses newAddresses =
359 new PortAddresses(addresses.connectPoint(), union, newMac);
360
361 portAddresses.put(newAddresses.connectPoint(), newAddresses);
362 }
363 }
364 }
365
366 @Override
367 public void removeAddressBindings(PortAddresses addresses) {
368 synchronized (portAddresses) {
369 PortAddresses existing = portAddresses.get(addresses.connectPoint());
370 if (existing != null) {
Pavlin Radoslavov76b0ae22014-10-27 15:33:19 -0700371 Set<InterfaceIpAddress> difference =
372 Sets.difference(existing.ipAddresses(),
373 addresses.ipAddresses()).immutableCopy();
Madan Jampaniab994042014-10-13 15:34:04 -0700374
375 // If they removed the existing mac, set the new mac to null.
376 // Otherwise, keep the existing mac.
377 MacAddress newMac = existing.mac();
378 if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
379 newMac = null;
380 }
381
382 PortAddresses newAddresses =
383 new PortAddresses(addresses.connectPoint(), difference, newMac);
384
385 portAddresses.put(newAddresses.connectPoint(), newAddresses);
386 }
387 }
388 }
389
390 @Override
391 public void clearAddressBindings(ConnectPoint connectPoint) {
392 synchronized (portAddresses) {
393 portAddresses.remove(connectPoint);
394 }
395 }
396
397 @Override
398 public Set<PortAddresses> getAddressBindings() {
399 synchronized (portAddresses) {
400 return new HashSet<>(portAddresses.values());
401 }
402 }
403
404 @Override
405 public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
406 PortAddresses addresses;
407
408 synchronized (portAddresses) {
409 addresses = portAddresses.get(connectPoint);
410 }
411
412 if (addresses == null) {
413 addresses = new PortAddresses(connectPoint, null, null);
414 }
415
416 return addresses;
417 }
418
419 // Auxiliary extension to allow location to mutate.
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700420 private static final class StoredHost extends DefaultHost {
Madan Jampaniab994042014-10-13 15:34:04 -0700421 private Timestamped<HostLocation> location;
422
423 /**
424 * Creates an end-station host using the supplied information.
425 *
426 * @param providerId provider identity
427 * @param id host identifier
428 * @param mac host MAC address
429 * @param vlan host VLAN identifier
430 * @param location host location
431 * @param ips host IP addresses
432 * @param annotations optional key/value annotations
433 */
434 public StoredHost(ProviderId providerId, HostId id,
435 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700436 Set<IpAddress> ips, Annotations... annotations) {
Madan Jampaniab994042014-10-13 15:34:04 -0700437 super(providerId, id, mac, vlan, location.value(), ips, annotations);
438 this.location = location;
439 }
440
441 void setLocation(Timestamped<HostLocation> location) {
442 this.location = location;
443 }
444
445 @Override
446 public HostLocation location() {
447 return location.value();
448 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700449
450 public Timestamp timestamp() {
451 return location.timestamp();
452 }
Madan Jampaniab994042014-10-13 15:34:04 -0700453 }
Madan Jampani312a2982014-10-14 21:07:16 -0700454
455 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
456 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
457 }
458
459 private void notifyPeers(InternalHostEvent event) throws IOException {
460 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
461 }
462
463 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
464 ClusterMessage message = new ClusterMessage(
465 clusterService.getLocalNode().id(),
466 subject,
467 SERIALIZER.encode(event));
468 clusterCommunicator.broadcast(message);
469 }
Madan Jampani255618f2014-10-14 23:27:57 -0700470
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700471 private void unicastMessage(NodeId peer,
472 MessageSubject subject,
473 Object event) throws IOException {
474 ClusterMessage message = new ClusterMessage(
475 clusterService.getLocalNode().id(),
476 subject,
477 SERIALIZER.encode(event));
478 clusterCommunicator.unicast(message, peer);
479 }
480
Madan Jampani255618f2014-10-14 23:27:57 -0700481 private void notifyDelegateIfNotNull(HostEvent event) {
482 if (event != null) {
483 notifyDelegate(event);
484 }
485 }
486
487 private class InternalHostEventListener implements ClusterMessageHandler {
488 @Override
489 public void handle(ClusterMessage message) {
490
491 log.info("Received host update event from peer: {}", message.sender());
492 InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
493
494 ProviderId providerId = event.providerId();
495 HostId hostId = event.hostId();
496 HostDescription hostDescription = event.hostDescription();
497 Timestamp timestamp = event.timestamp();
498
499 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
500 }
501 }
502
503 private class InternalHostRemovedEventListener implements ClusterMessageHandler {
504 @Override
505 public void handle(ClusterMessage message) {
506
507 log.info("Received host removed event from peer: {}", message.sender());
508 InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
509
510 HostId hostId = event.hostId();
511 Timestamp timestamp = event.timestamp();
512
513 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
514 }
515 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700516
517 private final class SendAdvertisementTask implements Runnable {
518
519 @Override
520 public void run() {
521 if (Thread.currentThread().isInterrupted()) {
522 log.info("Interrupted, quitting");
523 return;
524 }
525
526 try {
527 final NodeId self = clusterService.getLocalNode().id();
528 Set<ControllerNode> nodes = clusterService.getNodes();
529
530 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
531 .transform(toNodeId())
532 .toList();
533
534 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
535 log.debug("No other peers in the cluster.");
536 return;
537 }
538
539 NodeId peer;
540 do {
541 int idx = RandomUtils.nextInt(0, nodeIds.size());
542 peer = nodeIds.get(idx);
543 } while (peer.equals(self));
544
545 HostAntiEntropyAdvertisement ad = createAdvertisement();
546
547 if (Thread.currentThread().isInterrupted()) {
548 log.info("Interrupted, quitting");
549 return;
550 }
551
552 try {
553 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
554 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700555 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700556 return;
557 }
558 } catch (Exception e) {
559 // catch all Exception to avoid Scheduled task being suppressed.
560 log.error("Exception thrown while sending advertisement", e);
561 }
562 }
563 }
564
565 private HostAntiEntropyAdvertisement createAdvertisement() {
566 final NodeId self = clusterService.getLocalNode().id();
567
568 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
569 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
570
571 for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
572
573 final HostId hostId = e.getKey();
574 final StoredHost hostInfo = e.getValue();
575 final ProviderId providerId = hostInfo.providerId();
576 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
577 }
578
579 for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
580 tombstones.put(e.getKey(), e.getValue().timestamp());
581 }
582
583 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
584 }
585
586 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
587
588 final NodeId sender = ad.sender();
589
590 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
591 // for each locally live Hosts...
592 final HostId hostId = host.getKey();
593 final StoredHost localHost = host.getValue();
594 final ProviderId providerId = localHost.providerId();
595 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
596 final Timestamp localLiveTimestamp = localHost.timestamp();
597
598 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
599 if (remoteTimestamp == null) {
600 remoteTimestamp = ad.tombstones().get(hostId);
601 }
602 if (remoteTimestamp == null ||
603 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
604
605 // local is more recent, push
606 // TODO: annotation is lost
607 final HostDescription desc = new DefaultHostDescription(
608 localHost.mac(),
609 localHost.vlan(),
610 localHost.location(),
611 localHost.ipAddresses());
612 try {
613 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
614 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
615 } catch (IOException e1) {
616 log.debug("Failed to send advertisement response", e1);
617 }
618 }
619
620 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
621 if (remoteDeadTimestamp != null &&
622 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
623 // sender has recent remove
624 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
625 }
626 }
627
628 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
629 // for each locally dead Hosts
630 final HostId hostId = dead.getKey();
631 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
632
633 // TODO: pick proper ProviderId, when supporting multi-provider
634 final ProviderId providerId = dead.getValue().value().providerId();
635 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
636
637 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
638 if (remoteLiveTimestamp != null &&
639 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
640 // sender has zombie, push
641 try {
642 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
643 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
644 } catch (IOException e1) {
645 log.debug("Failed to send advertisement response", e1);
646 }
647 }
648 }
649
650
651 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
652 // for each remote tombstone advertisement...
653 final HostId hostId = e.getKey();
654 final Timestamp adRemoveTimestamp = e.getValue();
655
656 final StoredHost storedHost = hosts.get(hostId);
657 if (storedHost == null) {
658 continue;
659 }
660 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
661 // sender has recent remove info, locally remove
662 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
663 }
664 }
665 }
666
667 private final class InternalHostAntiEntropyAdvertisementListener implements
668 ClusterMessageHandler {
669
670 @Override
671 public void handle(ClusterMessage message) {
672 log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
673 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
674 handleAntiEntropyAdvertisement(advertisement);
675 }
676 }
Madan Jampaniab994042014-10-13 15:34:04 -0700677}