blob: c0da9b9746d62cebc39608b05bda2e677f0e8da7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.host.impl;
Madan Jampaniab994042014-10-13 15:34:04 -070017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.HashMultimap;
20import com.google.common.collect.ImmutableList;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Multimaps;
24import com.google.common.collect.SetMultimap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070025import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Jonathan Hart7d656f42015-01-27 14:07:23 -080032import org.onlab.packet.IpAddress;
33import org.onlab.packet.MacAddress;
34import org.onlab.packet.VlanId;
35import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080036import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.ControllerNode;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.net.Annotations;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080040import org.onosproject.net.AnnotationsUtil;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DefaultAnnotations;
43import org.onosproject.net.DefaultHost;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.Host;
46import org.onosproject.net.HostId;
47import org.onosproject.net.HostLocation;
48import org.onosproject.net.host.DefaultHostDescription;
49import org.onosproject.net.host.HostClockService;
50import org.onosproject.net.host.HostDescription;
51import org.onosproject.net.host.HostEvent;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080052import org.onosproject.net.host.HostEvent.Type;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.host.HostStore;
54import org.onosproject.net.host.HostStoreDelegate;
55import org.onosproject.net.host.PortAddresses;
56import org.onosproject.net.provider.ProviderId;
57import org.onosproject.store.AbstractStore;
58import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.impl.Timestamped;
64import org.onosproject.store.serializers.KryoSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070065import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Madan Jampaniab994042014-10-13 15:34:04 -070066import org.slf4j.Logger;
67
Jonathan Hart7d656f42015-01-27 14:07:23 -080068import java.io.IOException;
69import java.util.Collections;
70import java.util.HashMap;
71import java.util.HashSet;
72import java.util.Map;
73import java.util.Map.Entry;
74import java.util.Set;
75import java.util.concurrent.ConcurrentHashMap;
76import java.util.concurrent.ExecutorService;
Jonathan Hart7d656f42015-01-27 14:07:23 -080077import java.util.concurrent.ScheduledExecutorService;
78import java.util.concurrent.TimeUnit;
79
80import static com.google.common.base.Preconditions.checkNotNull;
81import static com.google.common.collect.Multimaps.newSetMultimap;
82import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
83import static com.google.common.collect.Sets.newConcurrentHashSet;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080084import static java.util.concurrent.Executors.newCachedThreadPool;
Jonathan Hart7d656f42015-01-27 14:07:23 -080085import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080086import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart7d656f42015-01-27 14:07:23 -080087import static org.onlab.util.Tools.minPriority;
Jonathan Hart7d656f42015-01-27 14:07:23 -080088import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
89import static org.onosproject.net.DefaultAnnotations.merge;
90import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
91import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080092import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
Jonathan Hart7d656f42015-01-27 14:07:23 -080093import static org.slf4j.LoggerFactory.getLogger;
Madan Jampaniab994042014-10-13 15:34:04 -070094
Madan Jampaniab994042014-10-13 15:34:04 -070095/**
96 * Manages inventory of end-station hosts in distributed data store
97 * that uses optimistic replication and gossip based techniques.
98 */
99@Component(immediate = true)
100@Service
101public class GossipHostStore
102 extends AbstractStore<HostEvent, HostStoreDelegate>
103 implements HostStore {
104
105 private final Logger log = getLogger(getClass());
106
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800107 // TODO: make this configurable
108 private int hostsExpected = 2000000;
Madan Jampaniab994042014-10-13 15:34:04 -0700109
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800110 // Host inventory
111 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
112
113 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
Madan Jampaniab994042014-10-13 15:34:04 -0700114
115 // Hosts tracked by their location
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800116 private final Multimap<ConnectPoint, Host> locations
117 = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
118 () -> newConcurrentHashSet()));
Madan Jampaniab994042014-10-13 15:34:04 -0700119
Jonathan Harta887ba82014-11-03 15:20:52 -0800120 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
121 Multimaps.synchronizedSetMultimap(
122 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected HostClockService hostClockService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ClusterCommunicationService clusterCommunicator;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterService clusterService;
132
Madan Jampani312a2982014-10-14 21:07:16 -0700133 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
134 @Override
135 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700136 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800137 .register(DistributedStoreSerializers.STORE_COMMON)
138 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700139 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700140 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700141 .register(HostFragmentId.class)
142 .register(HostAntiEntropyAdvertisement.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800143 .build();
Madan Jampani312a2982014-10-14 21:07:16 -0700144 }
145 };
146
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800147 private ExecutorService executor;
148
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800149 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700150
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800151 // TODO: Make these anti-entropy params configurable
152 private long initialDelaySec = 5;
153 private long periodSec = 5;
154
Madan Jampaniab994042014-10-13 15:34:04 -0700155 @Activate
156 public void activate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700157
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800158 executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800159
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800160 backgroundExecutor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800161 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700162
Madan Jampani2af244a2015-02-22 13:12:01 -0800163 clusterCommunicator.addSubscriber(
164 HOST_UPDATED_MSG,
165 new InternalHostEventListener(), executor);
166 clusterCommunicator.addSubscriber(
167 HOST_REMOVED_MSG,
168 new InternalHostRemovedEventListener(), executor);
169 clusterCommunicator.addSubscriber(
170 HOST_ANTI_ENTROPY_ADVERTISEMENT,
171 new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
172
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700173 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800174 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700175 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700176
Madan Jampaniab994042014-10-13 15:34:04 -0700177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800182 executor.shutdownNow();
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800183 backgroundExecutor.shutdownNow();
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700184 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800185 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700186 log.error("Timeout during executor shutdown");
187 }
188 } catch (InterruptedException e) {
189 log.error("Error during executor shutdown", e);
190 }
191
192 hosts.clear();
193 removedHosts.clear();
194 locations.clear();
195 portAddresses.clear();
196
Madan Jampaniab994042014-10-13 15:34:04 -0700197 log.info("Stopped");
198 }
199
200 @Override
201 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
202 HostDescription hostDescription) {
203 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700204 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
205 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800206 log.debug("Notifying peers of a host topology event for providerId: "
Madan Jampani312a2982014-10-14 21:07:16 -0700207 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800208 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
Madan Jampani312a2982014-10-14 21:07:16 -0700209 }
210 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700211 }
212
213 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
214 HostDescription hostDescription, Timestamp timestamp) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800215 // If this host was previously removed, first ensure
216 // this new request is "newer"
217 if (isHostRemoved(hostId, timestamp)) {
218 log.debug("Ignoring update for removed host {}@{}",
219 hostDescription, timestamp);
220 return null;
221 }
Madan Jampaniab994042014-10-13 15:34:04 -0700222 StoredHost host = hosts.get(hostId);
223 if (host == null) {
224 return createHost(providerId, hostId, hostDescription, timestamp);
225 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800226 return updateHost(providerId, hostId, host, hostDescription, timestamp);
227 }
228
229 /**
230 * @param hostId host identifier
231 * @param timestamp timstamp to compare with
232 * @return true if given timestamp is more recent timestamp compared to
233 * the timestamp Host was removed.
234 */
235 private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
236 Timestamped<Host> removedInfo = removedHosts.get(hostId);
237 if (removedInfo != null) {
Jonathan Hart403ea932015-02-20 16:23:00 -0800238 if (removedInfo.isNewerThan(timestamp)) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800239 return true;
240 }
241 removedHosts.remove(hostId, removedInfo);
242 }
243 return false;
Madan Jampaniab994042014-10-13 15:34:04 -0700244 }
245
246 // creates a new host and sends HOST_ADDED
247 private HostEvent createHost(ProviderId providerId, HostId hostId,
248 HostDescription descr, Timestamp timestamp) {
249 synchronized (this) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800250 StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
Madan Jampaniab994042014-10-13 15:34:04 -0700251 descr.hwAddress(),
252 descr.vlan(),
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800253 descr.location(),
Brian O'Connor6db75772016-01-08 02:03:49 -0800254 ImmutableSet.copyOf(descr.ipAddress()),
255 descr.annotations());
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800256 StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
257 if (concAdd != null) {
258 // concurrent add detected, retry from start
259 return updateHost(providerId, hostId, concAdd, descr, timestamp);
260 }
Madan Jampaniab994042014-10-13 15:34:04 -0700261 locations.put(descr.location(), newhost);
262 return new HostEvent(HOST_ADDED, newhost);
263 }
264 }
265
266 // checks for type of update to host, sends appropriate event
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800267 private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
Madan Jampaniab994042014-10-13 15:34:04 -0700268 HostDescription descr, Timestamp timestamp) {
Madan Jampaniab994042014-10-13 15:34:04 -0700269
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800270 if (timestamp.compareTo(oldHost.timestamp()) < 0) {
271 // new timestamp is older
272 log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700273 return null;
274 }
275
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800276 final boolean hostMoved = !oldHost.location().equals(descr.location());
277 if (hostMoved ||
278 !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
279 !descr.annotations().keys().isEmpty()) {
280
281 Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
282 addresses.addAll(descr.ipAddress());
283 Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
284 descr.annotations());
285
286 Timestamp newTimestamp = timestamp;
287 // if merged Set/Annotation differ from description...
288 final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
289 !AnnotationsUtil.isEqual(descr.annotations(), annotations);
290 if (deltaUpdate) {
291 // ..then local existing info had something description didn't
292 newTimestamp = hostClockService.getTimestamp(hostId);
293 log.debug("delta update detected on {}, substepping timestamp to {}",
294 hostId, newTimestamp);
295 }
296
297 StoredHost updated = new StoredHost(newTimestamp,
298 providerId, oldHost.id(),
299 oldHost.mac(), oldHost.vlan(),
300 descr.location(),
301 addresses,
302 annotations);
303 synchronized (this) {
304 boolean replaced = hosts.replace(hostId, oldHost, updated);
305 if (!replaced) {
306 // concurrent update, retry
307 return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
308 }
309 locations.remove(oldHost.location(), oldHost);
310 locations.put(updated.location(), updated);
311
312 HostEvent.Type eventType;
313 if (hostMoved) {
314 eventType = Type.HOST_MOVED;
315 } else {
316 eventType = Type.HOST_UPDATED;
317 }
318 return new HostEvent(eventType, updated);
319 }
Madan Jampaniab994042014-10-13 15:34:04 -0700320 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800321 return null;
Madan Jampaniab994042014-10-13 15:34:04 -0700322 }
323
324 @Override
325 public HostEvent removeHost(HostId hostId) {
326 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700327 HostEvent event = removeHostInternal(hostId, timestamp);
328 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800329 log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800330 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
Madan Jampani312a2982014-10-14 21:07:16 -0700331 }
332 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700333 }
334
335 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
336 synchronized (this) {
337 Host host = hosts.remove(hostId);
338 if (host != null) {
339 locations.remove((host.location()), host);
340 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
341 return new HostEvent(HOST_REMOVED, host);
342 }
343 return null;
344 }
345 }
346
347 @Override
348 public int getHostCount() {
349 return hosts.size();
350 }
351
352 @Override
353 public Iterable<Host> getHosts() {
354 return ImmutableSet.<Host>copyOf(hosts.values());
355 }
356
357 @Override
358 public Host getHost(HostId hostId) {
359 return hosts.get(hostId);
360 }
361
362 @Override
363 public Set<Host> getHosts(VlanId vlanId) {
364 Set<Host> vlanset = new HashSet<>();
365 for (Host h : hosts.values()) {
366 if (h.vlan().equals(vlanId)) {
367 vlanset.add(h);
368 }
369 }
370 return vlanset;
371 }
372
373 @Override
374 public Set<Host> getHosts(MacAddress mac) {
375 Set<Host> macset = new HashSet<>();
376 for (Host h : hosts.values()) {
377 if (h.mac().equals(mac)) {
378 macset.add(h);
379 }
380 }
381 return macset;
382 }
383
384 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700385 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700386 Set<Host> ipset = new HashSet<>();
387 for (Host h : hosts.values()) {
388 if (h.ipAddresses().contains(ip)) {
389 ipset.add(h);
390 }
391 }
392 return ipset;
393 }
394
395 @Override
396 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
397 return ImmutableSet.copyOf(locations.get(connectPoint));
398 }
399
400 @Override
401 public Set<Host> getConnectedHosts(DeviceId deviceId) {
402 Set<Host> hostset = new HashSet<>();
403 for (ConnectPoint p : locations.keySet()) {
404 if (p.deviceId().equals(deviceId)) {
405 hostset.addAll(locations.get(p));
406 }
407 }
408 return hostset;
409 }
410
411 @Override
412 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800413 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700414 }
415
416 @Override
417 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800418 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700419 }
420
421 @Override
422 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800423 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700424 }
425
426 @Override
427 public Set<PortAddresses> getAddressBindings() {
428 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800429 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700430 }
431 }
432
433 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800434 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700435 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800436 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700437
Jonathan Harta887ba82014-11-03 15:20:52 -0800438 if (addresses == null) {
439 return Collections.emptySet();
440 } else {
441 return ImmutableSet.copyOf(addresses);
442 }
Madan Jampaniab994042014-10-13 15:34:04 -0700443 }
Madan Jampaniab994042014-10-13 15:34:04 -0700444 }
445
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700446 private static final class StoredHost extends DefaultHost {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800447 private final Timestamp timestamp;
Madan Jampaniab994042014-10-13 15:34:04 -0700448
449 /**
450 * Creates an end-station host using the supplied information.
451 *
452 * @param providerId provider identity
453 * @param id host identifier
454 * @param mac host MAC address
455 * @param vlan host VLAN identifier
456 * @param location host location
457 * @param ips host IP addresses
458 * @param annotations optional key/value annotations
459 */
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800460 public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
461 MacAddress mac, VlanId vlan, HostLocation location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700462 Set<IpAddress> ips, Annotations... annotations) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800463 super(providerId, id, mac, vlan, location, ips, annotations);
464 this.timestamp = checkNotNull(timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700465 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700466
467 public Timestamp timestamp() {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800468 return timestamp;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700469 }
Madan Jampaniab994042014-10-13 15:34:04 -0700470 }
Madan Jampani312a2982014-10-14 21:07:16 -0700471
Jonathan Hart7d656f42015-01-27 14:07:23 -0800472 private void notifyPeers(InternalHostRemovedEvent event) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800473 broadcastMessage(HOST_REMOVED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700474 }
475
Jonathan Hart7d656f42015-01-27 14:07:23 -0800476 private void notifyPeers(InternalHostEvent event) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800477 broadcastMessage(HOST_UPDATED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700478 }
479
Jonathan Hart7d656f42015-01-27 14:07:23 -0800480 private void broadcastMessage(MessageSubject subject, Object event) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700481 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
Madan Jampani312a2982014-10-14 21:07:16 -0700482 }
Madan Jampani255618f2014-10-14 23:27:57 -0700483
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700484 private void unicastMessage(NodeId peer,
485 MessageSubject subject,
486 Object event) throws IOException {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700487 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700488 }
489
Madan Jampani255618f2014-10-14 23:27:57 -0700490 private void notifyDelegateIfNotNull(HostEvent event) {
491 if (event != null) {
492 notifyDelegate(event);
493 }
494 }
495
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800496 private final class InternalHostEventListener
497 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700498 @Override
499 public void handle(ClusterMessage message) {
500
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800501 log.debug("Received host update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800502 InternalHostEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700503
504 ProviderId providerId = event.providerId();
505 HostId hostId = event.hostId();
506 HostDescription hostDescription = event.hostDescription();
507 Timestamp timestamp = event.timestamp();
508
Madan Jampani2af244a2015-02-22 13:12:01 -0800509 try {
510 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
511 hostId,
512 hostDescription,
513 timestamp));
514 } catch (Exception e) {
515 log.warn("Exception thrown handling host removed", e);
516 }
Madan Jampani255618f2014-10-14 23:27:57 -0700517 }
518 }
519
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800520 private final class InternalHostRemovedEventListener
521 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700522 @Override
523 public void handle(ClusterMessage message) {
524
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800525 log.debug("Received host removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800526 InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700527
528 HostId hostId = event.hostId();
529 Timestamp timestamp = event.timestamp();
530
Madan Jampani2af244a2015-02-22 13:12:01 -0800531 try {
532 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
533 } catch (Exception e) {
534 log.warn("Exception thrown handling host removed", e);
535 }
Madan Jampani255618f2014-10-14 23:27:57 -0700536 }
537 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700538
539 private final class SendAdvertisementTask implements Runnable {
540
541 @Override
542 public void run() {
543 if (Thread.currentThread().isInterrupted()) {
544 log.info("Interrupted, quitting");
545 return;
546 }
547
548 try {
549 final NodeId self = clusterService.getLocalNode().id();
550 Set<ControllerNode> nodes = clusterService.getNodes();
551
552 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
553 .transform(toNodeId())
554 .toList();
555
556 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800557 log.trace("No other peers in the cluster.");
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700558 return;
559 }
560
561 NodeId peer;
562 do {
563 int idx = RandomUtils.nextInt(0, nodeIds.size());
564 peer = nodeIds.get(idx);
565 } while (peer.equals(self));
566
567 HostAntiEntropyAdvertisement ad = createAdvertisement();
568
569 if (Thread.currentThread().isInterrupted()) {
570 log.info("Interrupted, quitting");
571 return;
572 }
573
574 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800575 unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700576 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700577 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700578 return;
579 }
580 } catch (Exception e) {
581 // catch all Exception to avoid Scheduled task being suppressed.
582 log.error("Exception thrown while sending advertisement", e);
583 }
584 }
585 }
586
587 private HostAntiEntropyAdvertisement createAdvertisement() {
588 final NodeId self = clusterService.getLocalNode().id();
589
590 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
591 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
592
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800593 hosts.forEach((hostId, hostInfo) -> {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700594 final ProviderId providerId = hostInfo.providerId();
595 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800596 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700597
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800598 removedHosts.forEach((hostId, timestamped) -> {
599 tombstones.put(hostId, timestamped.timestamp());
600 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700601
602 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
603 }
604
605 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
606
607 final NodeId sender = ad.sender();
608
609 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
610 // for each locally live Hosts...
611 final HostId hostId = host.getKey();
612 final StoredHost localHost = host.getValue();
613 final ProviderId providerId = localHost.providerId();
614 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
615 final Timestamp localLiveTimestamp = localHost.timestamp();
616
617 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
618 if (remoteTimestamp == null) {
619 remoteTimestamp = ad.tombstones().get(hostId);
620 }
621 if (remoteTimestamp == null ||
622 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
623
624 // local is more recent, push
625 // TODO: annotation is lost
626 final HostDescription desc = new DefaultHostDescription(
627 localHost.mac(),
628 localHost.vlan(),
629 localHost.location(),
630 localHost.ipAddresses());
631 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800632 unicastMessage(sender, HOST_UPDATED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700633 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
634 } catch (IOException e1) {
635 log.debug("Failed to send advertisement response", e1);
636 }
637 }
638
639 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
640 if (remoteDeadTimestamp != null &&
641 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
642 // sender has recent remove
643 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
644 }
645 }
646
647 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
648 // for each locally dead Hosts
649 final HostId hostId = dead.getKey();
650 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
651
652 // TODO: pick proper ProviderId, when supporting multi-provider
653 final ProviderId providerId = dead.getValue().value().providerId();
654 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
655
656 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
657 if (remoteLiveTimestamp != null &&
658 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
659 // sender has zombie, push
660 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800661 unicastMessage(sender, HOST_REMOVED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700662 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
663 } catch (IOException e1) {
664 log.debug("Failed to send advertisement response", e1);
665 }
666 }
667 }
668
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700669 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
670 // for each remote tombstone advertisement...
671 final HostId hostId = e.getKey();
672 final Timestamp adRemoveTimestamp = e.getValue();
673
674 final StoredHost storedHost = hosts.get(hostId);
675 if (storedHost == null) {
676 continue;
677 }
678 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
679 // sender has recent remove info, locally remove
680 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
681 }
682 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800683
684 // if remote ad has something unknown, actively sync
685 for (HostFragmentId key : ad.timestamps().keySet()) {
686 if (!hosts.containsKey(key.hostId())) {
687 HostAntiEntropyAdvertisement myAd = createAdvertisement();
688 try {
689 unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
690 break;
691 } catch (IOException e) {
692 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
693 }
694 }
695 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700696 }
697
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800698 private final class InternalHostAntiEntropyAdvertisementListener
699 implements ClusterMessageHandler {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700700
701 @Override
702 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800703 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700704 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800705 try {
706 handleAntiEntropyAdvertisement(advertisement);
707 } catch (Exception e) {
708 log.warn("Exception thrown handling Host advertisements", e);
709 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700710 }
711 }
Madan Jampaniab994042014-10-13 15:34:04 -0700712}