blob: 5e7048a832538b07fe1f4260e30414e0fb28e7e2 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampaniab994042014-10-13 15:34:04 -070016package org.onlab.onos.store.host.impl;
17
Jonathan Harta887ba82014-11-03 15:20:52 -080018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
Thomas Vachuskacd2920c2014-11-19 14:49:55 -080020import static org.onlab.onos.net.DefaultAnnotations.merge;
Jonathan Harta887ba82014-11-03 15:20:52 -080021import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
22import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
23import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
24import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
25import static org.onlab.util.Tools.namedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080026import static org.onlab.util.Tools.minPriority;
Jonathan Harta887ba82014-11-03 15:20:52 -080027import static org.slf4j.LoggerFactory.getLogger;
28
29import java.io.IOException;
30import java.util.Collections;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.Map;
34import java.util.Map.Entry;
35import java.util.Set;
36import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080037import java.util.concurrent.ExecutorService;
38import java.util.concurrent.Executors;
Jonathan Harta887ba82014-11-03 15:20:52 -080039import java.util.concurrent.ScheduledExecutorService;
40import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070041
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070042import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070043import org.apache.felix.scr.annotations.Activate;
44import org.apache.felix.scr.annotations.Component;
45import org.apache.felix.scr.annotations.Deactivate;
46import org.apache.felix.scr.annotations.Reference;
47import org.apache.felix.scr.annotations.ReferenceCardinality;
48import org.apache.felix.scr.annotations.Service;
49import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070050import org.onlab.onos.cluster.ControllerNode;
51import org.onlab.onos.cluster.NodeId;
Madan Jampaniab994042014-10-13 15:34:04 -070052import org.onlab.onos.net.Annotations;
53import org.onlab.onos.net.ConnectPoint;
Thomas Vachuskacd2920c2014-11-19 14:49:55 -080054import org.onlab.onos.net.DefaultAnnotations;
Madan Jampaniab994042014-10-13 15:34:04 -070055import org.onlab.onos.net.DefaultHost;
56import org.onlab.onos.net.DeviceId;
57import org.onlab.onos.net.Host;
58import org.onlab.onos.net.HostId;
59import org.onlab.onos.net.HostLocation;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070060import org.onlab.onos.net.host.DefaultHostDescription;
Madan Jampaniab994042014-10-13 15:34:04 -070061import org.onlab.onos.net.host.HostClockService;
62import org.onlab.onos.net.host.HostDescription;
63import org.onlab.onos.net.host.HostEvent;
64import org.onlab.onos.net.host.HostStore;
65import org.onlab.onos.net.host.HostStoreDelegate;
66import org.onlab.onos.net.host.PortAddresses;
67import org.onlab.onos.net.provider.ProviderId;
68import org.onlab.onos.store.AbstractStore;
69import org.onlab.onos.store.Timestamp;
70import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani312a2982014-10-14 21:07:16 -070071import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani255618f2014-10-14 23:27:57 -070072import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani312a2982014-10-14 21:07:16 -070073import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHIeecee552014-10-16 14:09:01 -070074import org.onlab.onos.store.impl.Timestamped;
Madan Jampani312a2982014-10-14 21:07:16 -070075import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080076import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070077import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070078import org.onlab.packet.MacAddress;
79import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070080import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070081import org.slf4j.Logger;
82
Jonathan Harta887ba82014-11-03 15:20:52 -080083import com.google.common.collect.FluentIterable;
84import com.google.common.collect.HashMultimap;
85import com.google.common.collect.ImmutableList;
86import com.google.common.collect.ImmutableSet;
87import com.google.common.collect.Multimap;
88import com.google.common.collect.Multimaps;
89import com.google.common.collect.SetMultimap;
Madan Jampaniab994042014-10-13 15:34:04 -070090
Yuta HIGUCHIa2639152014-10-14 15:08:10 -070091//TODO: multi-provider, annotation not supported.
Madan Jampaniab994042014-10-13 15:34:04 -070092/**
93 * Manages inventory of end-station hosts in distributed data store
94 * that uses optimistic replication and gossip based techniques.
95 */
96@Component(immediate = true)
97@Service
98public class GossipHostStore
99 extends AbstractStore<HostEvent, HostStoreDelegate>
100 implements HostStore {
101
102 private final Logger log = getLogger(getClass());
103
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800104 // TODO: make this configurable
105 private int hostsExpected = 2000000;
Madan Jampaniab994042014-10-13 15:34:04 -0700106
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800107 // Host inventory
108 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
109
110 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
Madan Jampaniab994042014-10-13 15:34:04 -0700111
112 // Hosts tracked by their location
113 private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
114
Jonathan Harta887ba82014-11-03 15:20:52 -0800115 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
116 Multimaps.synchronizedSetMultimap(
117 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
120 protected HostClockService hostClockService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected ClusterCommunicationService clusterCommunicator;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected ClusterService clusterService;
127
Madan Jampani312a2982014-10-14 21:07:16 -0700128 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
129 @Override
130 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700131 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800132 .register(DistributedStoreSerializers.STORE_COMMON)
133 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700134 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700135 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700136 .register(HostFragmentId.class)
137 .register(HostAntiEntropyAdvertisement.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800138 .build();
Madan Jampani312a2982014-10-14 21:07:16 -0700139 }
140 };
141
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800142 private ExecutorService executor;
143
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800144 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700145
Madan Jampaniab994042014-10-13 15:34:04 -0700146 @Activate
147 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700148 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700149 GossipHostStoreMessageSubjects.HOST_UPDATED,
150 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700151 clusterCommunicator.addSubscriber(
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700152 GossipHostStoreMessageSubjects.HOST_REMOVED,
153 new InternalHostRemovedEventListener());
154 clusterCommunicator.addSubscriber(
155 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
156 new InternalHostAntiEntropyAdvertisementListener());
157
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800158 executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
159
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800160 backgroundExecutor =
161 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700162
163 // TODO: Make these configurable
164 long initialDelaySec = 5;
165 long periodSec = 5;
166 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800167 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700168 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700169
Madan Jampaniab994042014-10-13 15:34:04 -0700170 log.info("Started");
171 }
172
173 @Deactivate
174 public void deactivate() {
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800175 executor.shutdownNow();
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800176 backgroundExecutor.shutdownNow();
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700177 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800178 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700179 log.error("Timeout during executor shutdown");
180 }
181 } catch (InterruptedException e) {
182 log.error("Error during executor shutdown", e);
183 }
184
185 hosts.clear();
186 removedHosts.clear();
187 locations.clear();
188 portAddresses.clear();
189
Madan Jampaniab994042014-10-13 15:34:04 -0700190 log.info("Stopped");
191 }
192
193 @Override
194 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
195 HostDescription hostDescription) {
196 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700197 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
198 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800199 log.debug("Notifying peers of a host topology event for providerId: "
Madan Jampani312a2982014-10-14 21:07:16 -0700200 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
201 try {
202 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
203 } catch (IOException e) {
204 log.error("Failed to notify peers of a host topology event for providerId: "
205 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
206 }
207 }
208 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700209 }
210
211 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
212 HostDescription hostDescription, Timestamp timestamp) {
213 StoredHost host = hosts.get(hostId);
214 if (host == null) {
215 return createHost(providerId, hostId, hostDescription, timestamp);
216 }
217 return updateHost(providerId, host, hostDescription, timestamp);
218 }
219
220 // creates a new host and sends HOST_ADDED
221 private HostEvent createHost(ProviderId providerId, HostId hostId,
222 HostDescription descr, Timestamp timestamp) {
223 synchronized (this) {
224 // If this host was previously removed, first ensure
225 // this new request is "newer"
226 if (removedHosts.containsKey(hostId)) {
227 if (removedHosts.get(hostId).isNewer(timestamp)) {
228 return null;
229 } else {
230 removedHosts.remove(hostId);
231 }
232 }
233 StoredHost newhost = new StoredHost(providerId, hostId,
234 descr.hwAddress(),
235 descr.vlan(),
236 new Timestamped<>(descr.location(), timestamp),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700237 ImmutableSet.copyOf(descr.ipAddress()));
Madan Jampaniab994042014-10-13 15:34:04 -0700238 hosts.put(hostId, newhost);
239 locations.put(descr.location(), newhost);
240 return new HostEvent(HOST_ADDED, newhost);
241 }
242 }
243
244 // checks for type of update to host, sends appropriate event
245 private HostEvent updateHost(ProviderId providerId, StoredHost host,
246 HostDescription descr, Timestamp timestamp) {
247 HostEvent event;
248 if (!host.location.isNewer(timestamp) && !host.location().equals(descr.location())) {
249 host.setLocation(new Timestamped<>(descr.location(), timestamp));
250 return new HostEvent(HOST_MOVED, host);
251 }
252
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800253 if (host.ipAddresses().containsAll(descr.ipAddress()) &&
254 descr.annotations().keys().isEmpty()) {
Madan Jampaniab994042014-10-13 15:34:04 -0700255 return null;
256 }
257
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700258 Set<IpAddress> addresses = new HashSet<>(host.ipAddresses());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700259 addresses.addAll(descr.ipAddress());
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800260 Annotations annotations = merge((DefaultAnnotations) host.annotations(),
261 descr.annotations());
Madan Jampaniab994042014-10-13 15:34:04 -0700262 StoredHost updated = new StoredHost(providerId, host.id(),
263 host.mac(), host.vlan(),
Thomas Vachuskacd2920c2014-11-19 14:49:55 -0800264 host.location, addresses,
265 annotations);
Madan Jampaniab994042014-10-13 15:34:04 -0700266 event = new HostEvent(HOST_UPDATED, updated);
267 synchronized (this) {
268 hosts.put(host.id(), updated);
269 locations.remove(host.location(), host);
270 locations.put(updated.location(), updated);
271 }
272 return event;
273 }
274
275 @Override
276 public HostEvent removeHost(HostId hostId) {
277 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700278 HostEvent event = removeHostInternal(hostId, timestamp);
279 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800280 log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700281 try {
282 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
283 } catch (IOException e) {
284 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
285 }
286 }
287 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700288 }
289
290 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
291 synchronized (this) {
292 Host host = hosts.remove(hostId);
293 if (host != null) {
294 locations.remove((host.location()), host);
295 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
296 return new HostEvent(HOST_REMOVED, host);
297 }
298 return null;
299 }
300 }
301
302 @Override
303 public int getHostCount() {
304 return hosts.size();
305 }
306
307 @Override
308 public Iterable<Host> getHosts() {
309 return ImmutableSet.<Host>copyOf(hosts.values());
310 }
311
312 @Override
313 public Host getHost(HostId hostId) {
314 return hosts.get(hostId);
315 }
316
317 @Override
318 public Set<Host> getHosts(VlanId vlanId) {
319 Set<Host> vlanset = new HashSet<>();
320 for (Host h : hosts.values()) {
321 if (h.vlan().equals(vlanId)) {
322 vlanset.add(h);
323 }
324 }
325 return vlanset;
326 }
327
328 @Override
329 public Set<Host> getHosts(MacAddress mac) {
330 Set<Host> macset = new HashSet<>();
331 for (Host h : hosts.values()) {
332 if (h.mac().equals(mac)) {
333 macset.add(h);
334 }
335 }
336 return macset;
337 }
338
339 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700340 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700341 Set<Host> ipset = new HashSet<>();
342 for (Host h : hosts.values()) {
343 if (h.ipAddresses().contains(ip)) {
344 ipset.add(h);
345 }
346 }
347 return ipset;
348 }
349
350 @Override
351 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
352 return ImmutableSet.copyOf(locations.get(connectPoint));
353 }
354
355 @Override
356 public Set<Host> getConnectedHosts(DeviceId deviceId) {
357 Set<Host> hostset = new HashSet<>();
358 for (ConnectPoint p : locations.keySet()) {
359 if (p.deviceId().equals(deviceId)) {
360 hostset.addAll(locations.get(p));
361 }
362 }
363 return hostset;
364 }
365
366 @Override
367 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800368 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700369 }
370
371 @Override
372 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800373 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700374 }
375
376 @Override
377 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800378 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700379 }
380
381 @Override
382 public Set<PortAddresses> getAddressBindings() {
383 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800384 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700385 }
386 }
387
388 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800389 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700390 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800391 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700392
Jonathan Harta887ba82014-11-03 15:20:52 -0800393 if (addresses == null) {
394 return Collections.emptySet();
395 } else {
396 return ImmutableSet.copyOf(addresses);
397 }
Madan Jampaniab994042014-10-13 15:34:04 -0700398 }
Madan Jampaniab994042014-10-13 15:34:04 -0700399 }
400
401 // Auxiliary extension to allow location to mutate.
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700402 private static final class StoredHost extends DefaultHost {
Madan Jampaniab994042014-10-13 15:34:04 -0700403 private Timestamped<HostLocation> location;
404
405 /**
406 * Creates an end-station host using the supplied information.
407 *
408 * @param providerId provider identity
409 * @param id host identifier
410 * @param mac host MAC address
411 * @param vlan host VLAN identifier
412 * @param location host location
413 * @param ips host IP addresses
414 * @param annotations optional key/value annotations
415 */
416 public StoredHost(ProviderId providerId, HostId id,
417 MacAddress mac, VlanId vlan, Timestamped<HostLocation> location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700418 Set<IpAddress> ips, Annotations... annotations) {
Madan Jampaniab994042014-10-13 15:34:04 -0700419 super(providerId, id, mac, vlan, location.value(), ips, annotations);
420 this.location = location;
421 }
422
423 void setLocation(Timestamped<HostLocation> location) {
424 this.location = location;
425 }
426
427 @Override
428 public HostLocation location() {
429 return location.value();
430 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700431
432 public Timestamp timestamp() {
433 return location.timestamp();
434 }
Madan Jampaniab994042014-10-13 15:34:04 -0700435 }
Madan Jampani312a2982014-10-14 21:07:16 -0700436
437 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
438 broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
439 }
440
441 private void notifyPeers(InternalHostEvent event) throws IOException {
442 broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
443 }
444
445 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
446 ClusterMessage message = new ClusterMessage(
447 clusterService.getLocalNode().id(),
448 subject,
449 SERIALIZER.encode(event));
450 clusterCommunicator.broadcast(message);
451 }
Madan Jampani255618f2014-10-14 23:27:57 -0700452
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700453 private void unicastMessage(NodeId peer,
454 MessageSubject subject,
455 Object event) throws IOException {
456 ClusterMessage message = new ClusterMessage(
457 clusterService.getLocalNode().id(),
458 subject,
459 SERIALIZER.encode(event));
460 clusterCommunicator.unicast(message, peer);
461 }
462
Madan Jampani255618f2014-10-14 23:27:57 -0700463 private void notifyDelegateIfNotNull(HostEvent event) {
464 if (event != null) {
465 notifyDelegate(event);
466 }
467 }
468
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800469 private final class InternalHostEventListener
470 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700471 @Override
472 public void handle(ClusterMessage message) {
473
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800474 log.debug("Received host update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800475 InternalHostEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700476
477 ProviderId providerId = event.providerId();
478 HostId hostId = event.hostId();
479 HostDescription hostDescription = event.hostDescription();
480 Timestamp timestamp = event.timestamp();
481
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800482 executor.submit(new Runnable() {
483
484 @Override
485 public void run() {
486 try {
487 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
488 hostId,
489 hostDescription,
490 timestamp));
491 } catch (Exception e) {
492 log.warn("Exception thrown handling host removed", e);
493 }
494 }
495 });
Madan Jampani255618f2014-10-14 23:27:57 -0700496 }
497 }
498
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800499 private final class InternalHostRemovedEventListener
500 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700501 @Override
502 public void handle(ClusterMessage message) {
503
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800504 log.debug("Received host removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800505 InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700506
507 HostId hostId = event.hostId();
508 Timestamp timestamp = event.timestamp();
509
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800510 executor.submit(new Runnable() {
511
512 @Override
513 public void run() {
514 try {
515 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
516 } catch (Exception e) {
517 log.warn("Exception thrown handling host removed", e);
518 }
519 }
520 });
Madan Jampani255618f2014-10-14 23:27:57 -0700521 }
522 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700523
524 private final class SendAdvertisementTask implements Runnable {
525
526 @Override
527 public void run() {
528 if (Thread.currentThread().isInterrupted()) {
529 log.info("Interrupted, quitting");
530 return;
531 }
532
533 try {
534 final NodeId self = clusterService.getLocalNode().id();
535 Set<ControllerNode> nodes = clusterService.getNodes();
536
537 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
538 .transform(toNodeId())
539 .toList();
540
541 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800542 log.trace("No other peers in the cluster.");
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700543 return;
544 }
545
546 NodeId peer;
547 do {
548 int idx = RandomUtils.nextInt(0, nodeIds.size());
549 peer = nodeIds.get(idx);
550 } while (peer.equals(self));
551
552 HostAntiEntropyAdvertisement ad = createAdvertisement();
553
554 if (Thread.currentThread().isInterrupted()) {
555 log.info("Interrupted, quitting");
556 return;
557 }
558
559 try {
560 unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
561 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700562 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700563 return;
564 }
565 } catch (Exception e) {
566 // catch all Exception to avoid Scheduled task being suppressed.
567 log.error("Exception thrown while sending advertisement", e);
568 }
569 }
570 }
571
572 private HostAntiEntropyAdvertisement createAdvertisement() {
573 final NodeId self = clusterService.getLocalNode().id();
574
575 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
576 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
577
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800578 hosts.forEach((hostId, hostInfo) -> {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700579 final ProviderId providerId = hostInfo.providerId();
580 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800581 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700582
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800583 removedHosts.forEach((hostId, timestamped) -> {
584 tombstones.put(hostId, timestamped.timestamp());
585 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700586
587 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
588 }
589
590 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
591
592 final NodeId sender = ad.sender();
593
594 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
595 // for each locally live Hosts...
596 final HostId hostId = host.getKey();
597 final StoredHost localHost = host.getValue();
598 final ProviderId providerId = localHost.providerId();
599 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
600 final Timestamp localLiveTimestamp = localHost.timestamp();
601
602 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
603 if (remoteTimestamp == null) {
604 remoteTimestamp = ad.tombstones().get(hostId);
605 }
606 if (remoteTimestamp == null ||
607 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
608
609 // local is more recent, push
610 // TODO: annotation is lost
611 final HostDescription desc = new DefaultHostDescription(
612 localHost.mac(),
613 localHost.vlan(),
614 localHost.location(),
615 localHost.ipAddresses());
616 try {
617 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
618 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
619 } catch (IOException e1) {
620 log.debug("Failed to send advertisement response", e1);
621 }
622 }
623
624 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
625 if (remoteDeadTimestamp != null &&
626 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
627 // sender has recent remove
628 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
629 }
630 }
631
632 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
633 // for each locally dead Hosts
634 final HostId hostId = dead.getKey();
635 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
636
637 // TODO: pick proper ProviderId, when supporting multi-provider
638 final ProviderId providerId = dead.getValue().value().providerId();
639 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
640
641 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
642 if (remoteLiveTimestamp != null &&
643 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
644 // sender has zombie, push
645 try {
646 unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
647 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
648 } catch (IOException e1) {
649 log.debug("Failed to send advertisement response", e1);
650 }
651 }
652 }
653
654
655 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
656 // for each remote tombstone advertisement...
657 final HostId hostId = e.getKey();
658 final Timestamp adRemoveTimestamp = e.getValue();
659
660 final StoredHost storedHost = hosts.get(hostId);
661 if (storedHost == null) {
662 continue;
663 }
664 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
665 // sender has recent remove info, locally remove
666 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
667 }
668 }
669 }
670
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800671 private final class InternalHostAntiEntropyAdvertisementListener
672 implements ClusterMessageHandler {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700673
674 @Override
675 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800676 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700677 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800678 backgroundExecutor.submit(new Runnable() {
679
680 @Override
681 public void run() {
682 try {
683 handleAntiEntropyAdvertisement(advertisement);
684 } catch (Exception e) {
685 log.warn("Exception thrown handling Host advertisements", e);
686 }
687 }
688 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700689 }
690 }
Madan Jampaniab994042014-10-13 15:34:04 -0700691}