blob: 10bf9ecdf8a1d2111ba8ccb9e81bf5d58c7347c7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.host.impl;
Madan Jampaniab994042014-10-13 15:34:04 -070017
Jonathan Harta887ba82014-11-03 15:20:52 -080018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorabafb502014-12-02 22:26:20 -080019import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
20import static org.onosproject.net.DefaultAnnotations.merge;
21import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
22import static org.onosproject.net.host.HostEvent.Type.HOST_MOVED;
23import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
24import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
Jonathan Harta887ba82014-11-03 15:20:52 -080025import static org.onlab.util.Tools.namedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080026import static org.onlab.util.Tools.minPriority;
Jonathan Harta887ba82014-11-03 15:20:52 -080027import static org.slf4j.LoggerFactory.getLogger;
28
29import java.io.IOException;
30import java.util.Collections;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Map;
34import java.util.Map.Entry;
35import java.util.Set;
36import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080037import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
Jonathan Harta887ba82014-11-03 15:20:52 -080039import java.util.concurrent.ScheduledExecutorService;
40import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070041
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070042import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070043import org.apache.felix.scr.annotations.Activate;
44import org.apache.felix.scr.annotations.Component;
45import org.apache.felix.scr.annotations.Deactivate;
46import org.apache.felix.scr.annotations.Reference;
47import org.apache.felix.scr.annotations.ReferenceCardinality;
48import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080049import org.onosproject.cluster.ClusterService;
50import org.onosproject.cluster.ControllerNode;
51import org.onosproject.cluster.NodeId;
52import org.onosproject.net.Annotations;
53import org.onosproject.net.ConnectPoint;
54import org.onosproject.net.DefaultAnnotations;
55import org.onosproject.net.DefaultHost;
56import org.onosproject.net.DeviceId;
57import org.onosproject.net.Host;
58import org.onosproject.net.HostId;
59import org.onosproject.net.HostLocation;
60import org.onosproject.net.host.DefaultHostDescription;
61import org.onosproject.net.host.HostClockService;
62import org.onosproject.net.host.HostDescription;
63import org.onosproject.net.host.HostEvent;
64import org.onosproject.net.host.HostStore;
65import org.onosproject.net.host.HostStoreDelegate;
66import org.onosproject.net.host.PortAddresses;
67import org.onosproject.net.provider.ProviderId;
68import org.onosproject.store.AbstractStore;
69import org.onosproject.store.Timestamp;
70import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
71import org.onosproject.store.cluster.messaging.ClusterMessage;
72import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
73import org.onosproject.store.cluster.messaging.MessageSubject;
74import org.onosproject.store.impl.Timestamped;
75import org.onosproject.store.serializers.KryoSerializer;
76import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070077import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070078import org.onlab.packet.MacAddress;
79import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070080import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070081import org.slf4j.Logger;
82
Jonathan Harta887ba82014-11-03 15:20:52 -080083import com.google.common.collect.FluentIterable;
84import com.google.common.collect.HashMultimap;
85import com.google.common.collect.ImmutableList;
86import com.google.common.collect.ImmutableSet;
87import com.google.common.collect.Multimap;
88import com.google.common.collect.Multimaps;
89import com.google.common.collect.SetMultimap;
Madan Jampaniab994042014-10-13 15:34:04 -070090
Madan Jampaniab994042014-10-13 15:34:04 -070091/**
92 * Manages inventory of end-station hosts in distributed data store
93 * that uses optimistic replication and gossip based techniques.
94 */
95@Component(immediate = true)
96@Service
97public class GossipHostStore
98 extends AbstractStore<HostEvent, HostStoreDelegate>
99 implements HostStore {
100
101 private final Logger log = getLogger(getClass());
102
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800103 // TODO: make this configurable
104 private int hostsExpected = 2000000;
Madan Jampaniab994042014-10-13 15:34:04 -0700105
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800106 // Host inventory
107 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
108
109 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
Madan Jampaniab994042014-10-13 15:34:04 -0700110
111 // Hosts tracked by their location
112 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
113
Jonathan Harta887ba82014-11-03 15:20:52 -0800114 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
115 Multimaps.synchronizedSetMultimap(
116 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
119 protected HostClockService hostClockService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ClusterCommunicationService clusterCommunicator;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected ClusterService clusterService;
126
Madan Jampani312a2982014-10-14 21:07:16 -0700127 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
128 @Override
129 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700130 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800131 .register(DistributedStoreSerializers.STORE_COMMON)
132 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700133 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700134 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700135 .register(HostFragmentId.class)
136 .register(HostAntiEntropyAdvertisement.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800137 .build();
Madan Jampani312a2982014-10-14 21:07:16 -0700138 }
139 };
140
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800141 private ExecutorService executor;
142
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800143 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700144
Madan Jampaniab994042014-10-13 15:34:04 -0700145 @Activate
146 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700147 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700148 GossipHostStoreMessageSubjects.HOST_UPDATED,
149 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700150 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700151 GossipHostStoreMessageSubjects.HOST_REMOVED,
152 new InternalHostRemovedEventListener());
153 clusterCommunicator.addSubscriber(
154 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
155 new InternalHostAntiEntropyAdvertisementListener());
156
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800157 executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
158
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800159 backgroundExecutor =
160 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700161
162 // TODO: Make these configurable
163 long initialDelaySec = 5;
164 long periodSec = 5;
165 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800166 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700167 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700168
Madan Jampaniab994042014-10-13 15:34:04 -0700169 log.info("Started");
170 }
171
172 @Deactivate
173 public void deactivate() {
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800174 executor.shutdownNow();
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800175 backgroundExecutor.shutdownNow();
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700176 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800177 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700178 log.error("Timeout during executor shutdown");
179 }
180 } catch (InterruptedException e) {
181 log.error("Error during executor shutdown", e);
182 }
183
184 hosts.clear();
185 removedHosts.clear();
186 locations.clear();
187 portAddresses.clear();
188
Madan Jampaniab994042014-10-13 15:34:04 -0700189 log.info("Stopped");
190 }
191
192 @Override
193 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
194 HostDescription hostDescription) {
195 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700196 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
197 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800198 log.debug("Notifying peers of a host topology event for providerId: "
Madan Jampani312a2982014-10-14 21:07:16 -0700199 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
200 try {
201 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
202 } catch (IOException e) {
203 log.error("Failed to notify peers of a host topology event for providerId: "
204 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
205 }
206 }
207 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700208 }
209
210 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
211 HostDescription hostDescription, Timestamp timestamp) {
212 StoredHost host = hosts.get(hostId);
213 if (host == null) {
214 return createHost(providerId, hostId, hostDescription, timestamp);
215 }
216 return updateHost(providerId, host, hostDescription, timestamp);
217 }
218
219 // creates a new host and sends HOST_ADDED
220 private HostEvent createHost(ProviderId providerId, HostId hostId,
221 HostDescription descr, Timestamp timestamp) {
222 synchronized (this) {
223 // If this host was previously removed, first ensure
224 // this new request is "newer"
225 if (removedHosts.containsKey(hostId)) {
226 if (removedHosts.get(hostId).isNewer(timestamp)) {
227 return null;
228 } else {
229 removedHosts.remove(hostId);
230 }
231 }
232 StoredHost newhost = new StoredHost(providerId, hostId,
233 descr.hwAddress(),
234 descr.vlan(),
235 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700236 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700237 hosts.put(hostId, newhost);
238 locations.put(descr.location(), newhost);
239 return new HostEvent(HOST_ADDED, newhost);
240 }
241 }
242
243 // checks for type of update to host, sends appropriate event
244 private HostEvent updateHost(ProviderId providerId, StoredHost host,
245 HostDescription descr, Timestamp timestamp) {
246 HostEvent event;
247 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
248 host.setLocation(new Timestamped<>(descr.location(), timestamp));
249 return new HostEvent(HOST_MOVED, host);
250 }
251
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800252 if (host.ipAddresses().containsAll(descr.ipAddress()) &&
253 descr.annotations().keys().isEmpty()) {
Madan Jampaniab994042014-10-13 15:34:04 -0700254 return null;
255 }
256
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700257 Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700258 addresses.addAll(descr.ipAddress());
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800259 Annotations annotations = merge((DefaultAnnotations) host.annotations(),
260 descr.annotations());
Madan Jampaniab994042014-10-13 15:34:04 -0700261 StoredHost updated = new StoredHost(providerId, host.id(),
262 host.mac(), host.vlan(),
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800263 host.location, addresses,
264 annotations);
Madan Jampaniab994042014-10-13 15:34:04 -0700265 event = new HostEvent(HOST_UPDATED, updated);
266 synchronized (this) {
267 hosts.put(host.id(), updated);
268 locations.remove(host.location(), host);
269 locations.put(updated.location(), updated);
270 }
271 return event;
272 }
273
274 @Override
275 public HostEvent removeHost(HostId hostId) {
276 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700277 HostEvent event = removeHostInternal(hostId, timestamp);
278 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800279 log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700280 try {
281 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
282 } catch (IOException e) {
283 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
284 }
285 }
286 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700287 }
288
289 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
290 synchronized (this) {
291 Host host = hosts.remove(hostId);
292 if (host != null) {
293 locations.remove((host.location()), host);
294 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
295 return new HostEvent(HOST_REMOVED, host);
296 }
297 return null;
298 }
299 }
300
301 @Override
302 public int getHostCount() {
303 return hosts.size();
304 }
305
306 @Override
307 public Iterable<Host> getHosts() {
308 return ImmutableSet.<Host>copyOf(hosts.values());
309 }
310
311 @Override
312 public Host getHost(HostId hostId) {
313 return hosts.get(hostId);
314 }
315
316 @Override
317 public Set<Host> getHosts(VlanId vlanId) {
318 Set<Host> vlanset = new HashSet<>();
319 for (Host h : hosts.values()) {
320 if (h.vlan().equals(vlanId)) {
321 vlanset.add(h);
322 }
323 }
324 return vlanset;
325 }
326
327 @Override
328 public Set<Host> getHosts(MacAddress mac) {
329 Set<Host> macset = new HashSet<>();
330 for (Host h : hosts.values()) {
331 if (h.mac().equals(mac)) {
332 macset.add(h);
333 }
334 }
335 return macset;
336 }
337
338 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700339 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700340 Set<Host> ipset = new HashSet<>();
341 for (Host h : hosts.values()) {
342 if (h.ipAddresses().contains(ip)) {
343 ipset.add(h);
344 }
345 }
346 return ipset;
347 }
348
349 @Override
350 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
351 return ImmutableSet.copyOf(locations.get(connectPoint));
352 }
353
354 @Override
355 public Set<Host> getConnectedHosts(DeviceId deviceId) {
356 Set<Host> hostset = new HashSet<>();
357 for (ConnectPoint p : locations.keySet()) {
358 if (p.deviceId().equals(deviceId)) {
359 hostset.addAll(locations.get(p));
360 }
361 }
362 return hostset;
363 }
364
365 @Override
366 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800367 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700368 }
369
370 @Override
371 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800372 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700373 }
374
375 @Override
376 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800377 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700378 }
379
380 @Override
381 public Set<PortAddresses> getAddressBindings() {
382 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800383 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700384 }
385 }
386
387 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800388 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700389 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800390 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700391
Jonathan Harta887ba82014-11-03 15:20:52 -0800392 if (addresses == null) {
393 return Collections.emptySet();
394 } else {
395 return ImmutableSet.copyOf(addresses);
396 }
Madan Jampaniab994042014-10-13 15:34:04 -0700397 }
Madan Jampaniab994042014-10-13 15:34:04 -0700398 }
399
400 // Auxiliary extension to allow location to mutate.
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700401 private static final class StoredHost extends DefaultHost {
Madan Jampaniab994042014-10-13 15:34:04 -0700402 private Timestamped<HostLocation> location;
403
404 /**
405 * Creates an end-station host using the supplied information.
406 *
407 * @param providerId provider identity
408 * @param id host identifier
409 * @param mac host MAC address
410 * @param vlan host VLAN identifier
411 * @param location host location
412 * @param ips host IP addresses
413 * @param annotations optional key/value annotations
414 */
415 public StoredHost(ProviderId providerId, HostId id,
416 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700417 Set<IpAddress> ips, Annotations... annotations) {
Madan Jampaniab994042014-10-13 15:34:04 -0700418 super(providerId, id, mac, vlan, location.value(), ips, annotations);
419 this.location = location;
420 }
421
422 void setLocation(Timestamped<HostLocation> location) {
423 this.location = location;
424 }
425
426 @Override
427 public HostLocation location() {
428 return location.value();
429 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700430
431 public Timestamp timestamp() {
432 return location.timestamp();
433 }
Madan Jampaniab994042014-10-13 15:34:04 -0700434 }
Madan Jampani312a2982014-10-14 21:07:16 -0700435
436 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
437 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
438 }
439
440 private void notifyPeers(InternalHostEvent event) throws IOException {
441 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
442 }
443
444 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
445 ClusterMessage message = new ClusterMessage(
446 clusterService.getLocalNode().id(),
447 subject,
448 SERIALIZER.encode(event));
449 clusterCommunicator.broadcast(message);
450 }
Madan Jampani255618f2014-10-14 23:27:57 -0700451
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700452 private void unicastMessage(NodeId peer,
453 MessageSubject subject,
454 Object event) throws IOException {
455 ClusterMessage message = new ClusterMessage(
456 clusterService.getLocalNode().id(),
457 subject,
458 SERIALIZER.encode(event));
459 clusterCommunicator.unicast(message, peer);
460 }
461
Madan Jampani255618f2014-10-14 23:27:57 -0700462 private void notifyDelegateIfNotNull(HostEvent event) {
463 if (event != null) {
464 notifyDelegate(event);
465 }
466 }
467
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800468 private final class InternalHostEventListener
469 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700470 @Override
471 public void handle(ClusterMessage message) {
472
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800473 log.debug("Received host update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800474 InternalHostEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700475
476 ProviderId providerId = event.providerId();
477 HostId hostId = event.hostId();
478 HostDescription hostDescription = event.hostDescription();
479 Timestamp timestamp = event.timestamp();
480
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800481 executor.submit(new Runnable() {
482
483 @Override
484 public void run() {
485 try {
486 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
487 hostId,
488 hostDescription,
489 timestamp));
490 } catch (Exception e) {
491 log.warn("Exception thrown handling host removed", e);
492 }
493 }
494 });
Madan Jampani255618f2014-10-14 23:27:57 -0700495 }
496 }
497
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800498 private final class InternalHostRemovedEventListener
499 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700500 @Override
501 public void handle(ClusterMessage message) {
502
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800503 log.debug("Received host removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800504 InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700505
506 HostId hostId = event.hostId();
507 Timestamp timestamp = event.timestamp();
508
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800509 executor.submit(new Runnable() {
510
511 @Override
512 public void run() {
513 try {
514 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
515 } catch (Exception e) {
516 log.warn("Exception thrown handling host removed", e);
517 }
518 }
519 });
Madan Jampani255618f2014-10-14 23:27:57 -0700520 }
521 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700522
523 private final class SendAdvertisementTask implements Runnable {
524
525 @Override
526 public void run() {
527 if (Thread.currentThread().isInterrupted()) {
528 log.info("Interrupted, quitting");
529 return;
530 }
531
532 try {
533 final NodeId self = clusterService.getLocalNode().id();
534 Set<ControllerNode> nodes = clusterService.getNodes();
535
536 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
537 .transform(toNodeId())
538 .toList();
539
540 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800541 log.trace("No other peers in the cluster.");
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700542 return;
543 }
544
545 NodeId peer;
546 do {
547 int idx = RandomUtils.nextInt(0, nodeIds.size());
548 peer = nodeIds.get(idx);
549 } while (peer.equals(self));
550
551 HostAntiEntropyAdvertisement ad = createAdvertisement();
552
553 if (Thread.currentThread().isInterrupted()) {
554 log.info("Interrupted, quitting");
555 return;
556 }
557
558 try {
559 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
560 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700561 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700562 return;
563 }
564 } catch (Exception e) {
565 // catch all Exception to avoid Scheduled task being suppressed.
566 log.error("Exception thrown while sending advertisement", e);
567 }
568 }
569 }
570
571 private HostAntiEntropyAdvertisement createAdvertisement() {
572 final NodeId self = clusterService.getLocalNode().id();
573
574 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
575 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
576
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800577 hosts.forEach((hostId, hostInfo) -> {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700578 final ProviderId providerId = hostInfo.providerId();
579 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800580 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700581
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800582 removedHosts.forEach((hostId, timestamped) -> {
583 tombstones.put(hostId, timestamped.timestamp());
584 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700585
586 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
587 }
588
589 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
590
591 final NodeId sender = ad.sender();
592
593 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
594 // for each locally live Hosts...
595 final HostId hostId = host.getKey();
596 final StoredHost localHost = host.getValue();
597 final ProviderId providerId = localHost.providerId();
598 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
599 final Timestamp localLiveTimestamp = localHost.timestamp();
600
601 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
602 if (remoteTimestamp == null) {
603 remoteTimestamp = ad.tombstones().get(hostId);
604 }
605 if (remoteTimestamp == null ||
606 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
607
608 // local is more recent, push
609 // TODO: annotation is lost
610 final HostDescription desc = new DefaultHostDescription(
611 localHost.mac(),
612 localHost.vlan(),
613 localHost.location(),
614 localHost.ipAddresses());
615 try {
616 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
617 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
618 } catch (IOException e1) {
619 log.debug("Failed to send advertisement response", e1);
620 }
621 }
622
623 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
624 if (remoteDeadTimestamp != null &&
625 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
626 // sender has recent remove
627 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
628 }
629 }
630
631 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
632 // for each locally dead Hosts
633 final HostId hostId = dead.getKey();
634 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
635
636 // TODO: pick proper ProviderId, when supporting multi-provider
637 final ProviderId providerId = dead.getValue().value().providerId();
638 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
639
640 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
641 if (remoteLiveTimestamp != null &&
642 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
643 // sender has zombie, push
644 try {
645 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
646 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
647 } catch (IOException e1) {
648 log.debug("Failed to send advertisement response", e1);
649 }
650 }
651 }
652
653
654 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
655 // for each remote tombstone advertisement...
656 final HostId hostId = e.getKey();
657 final Timestamp adRemoveTimestamp = e.getValue();
658
659 final StoredHost storedHost = hosts.get(hostId);
660 if (storedHost == null) {
661 continue;
662 }
663 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
664 // sender has recent remove info, locally remove
665 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
666 }
667 }
668 }
669
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800670 private final class InternalHostAntiEntropyAdvertisementListener
671 implements ClusterMessageHandler {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700672
673 @Override
674 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800675 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700676 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800677 backgroundExecutor.submit(new Runnable() {
678
679 @Override
680 public void run() {
681 try {
682 handleAntiEntropyAdvertisement(advertisement);
683 } catch (Exception e) {
684 log.warn("Exception thrown handling Host advertisements", e);
685 }
686 }
687 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700688 }
689 }
Madan Jampaniab994042014-10-13 15:34:04 -0700690}