blob: 74236c36c22279cca4b7d586e72c46ef6be3dd5f [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.host.impl;
Madan Jampaniab994042014-10-13 15:34:04 -070017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.HashMultimap;
20import com.google.common.collect.ImmutableList;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Multimaps;
24import com.google.common.collect.SetMultimap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070025import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Jonathan Hart7d656f42015-01-27 14:07:23 -080032import org.onlab.packet.IpAddress;
33import org.onlab.packet.MacAddress;
34import org.onlab.packet.VlanId;
35import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080036import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.ControllerNode;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.net.Annotations;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080040import org.onosproject.net.AnnotationsUtil;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DefaultAnnotations;
43import org.onosproject.net.DefaultHost;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.Host;
46import org.onosproject.net.HostId;
47import org.onosproject.net.HostLocation;
48import org.onosproject.net.host.DefaultHostDescription;
49import org.onosproject.net.host.HostClockService;
50import org.onosproject.net.host.HostDescription;
51import org.onosproject.net.host.HostEvent;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080052import org.onosproject.net.host.HostEvent.Type;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.host.HostStore;
54import org.onosproject.net.host.HostStoreDelegate;
55import org.onosproject.net.host.PortAddresses;
56import org.onosproject.net.provider.ProviderId;
57import org.onosproject.store.AbstractStore;
58import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.impl.Timestamped;
64import org.onosproject.store.serializers.KryoSerializer;
65import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Madan Jampaniab994042014-10-13 15:34:04 -070066import org.slf4j.Logger;
67
Jonathan Hart7d656f42015-01-27 14:07:23 -080068import java.io.IOException;
69import java.util.Collections;
70import java.util.HashMap;
71import java.util.HashSet;
72import java.util.Map;
73import java.util.Map.Entry;
74import java.util.Set;
75import java.util.concurrent.ConcurrentHashMap;
76import java.util.concurrent.ExecutorService;
Jonathan Hart7d656f42015-01-27 14:07:23 -080077import java.util.concurrent.ScheduledExecutorService;
78import java.util.concurrent.TimeUnit;
79
80import static com.google.common.base.Preconditions.checkNotNull;
81import static com.google.common.collect.Multimaps.newSetMultimap;
82import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
83import static com.google.common.collect.Sets.newConcurrentHashSet;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080084import static java.util.concurrent.Executors.newCachedThreadPool;
Jonathan Hart7d656f42015-01-27 14:07:23 -080085import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080086import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart7d656f42015-01-27 14:07:23 -080087import static org.onlab.util.Tools.minPriority;
Jonathan Hart7d656f42015-01-27 14:07:23 -080088import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
89import static org.onosproject.net.DefaultAnnotations.merge;
90import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
91import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080092import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
Jonathan Hart7d656f42015-01-27 14:07:23 -080093import static org.slf4j.LoggerFactory.getLogger;
Madan Jampaniab994042014-10-13 15:34:04 -070094
Madan Jampaniab994042014-10-13 15:34:04 -070095/**
96 * Manages inventory of end-station hosts in distributed data store
97 * that uses optimistic replication and gossip based techniques.
98 */
99@Component(immediate = true)
100@Service
101public class GossipHostStore
102 extends AbstractStore<HostEvent, HostStoreDelegate>
103 implements HostStore {
104
105 private final Logger log = getLogger(getClass());
106
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800107 // TODO: make this configurable
108 private int hostsExpected = 2000000;
Madan Jampaniab994042014-10-13 15:34:04 -0700109
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800110 // Host inventory
111 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
112
113 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
Madan Jampaniab994042014-10-13 15:34:04 -0700114
115 // Hosts tracked by their location
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800116 private final Multimap<ConnectPoint, Host> locations
117 = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
118 () -> newConcurrentHashSet()));
Madan Jampaniab994042014-10-13 15:34:04 -0700119
Jonathan Harta887ba82014-11-03 15:20:52 -0800120 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
121 Multimaps.synchronizedSetMultimap(
122 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected HostClockService hostClockService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ClusterCommunicationService clusterCommunicator;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterService clusterService;
132
Madan Jampani312a2982014-10-14 21:07:16 -0700133 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
134 @Override
135 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700136 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800137 .register(DistributedStoreSerializers.STORE_COMMON)
138 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700139 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700140 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700141 .register(HostFragmentId.class)
142 .register(HostAntiEntropyAdvertisement.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800143 .build();
Madan Jampani312a2982014-10-14 21:07:16 -0700144 }
145 };
146
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800147 private ExecutorService executor;
148
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800149 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700150
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800151 // TODO: Make these anti-entropy params configurable
152 private long initialDelaySec = 5;
153 private long periodSec = 5;
154
Madan Jampaniab994042014-10-13 15:34:04 -0700155 @Activate
156 public void activate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700157
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800158 executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800159
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800160 backgroundExecutor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800161 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700162
Madan Jampani2af244a2015-02-22 13:12:01 -0800163 clusterCommunicator.addSubscriber(
164 HOST_UPDATED_MSG,
165 new InternalHostEventListener(), executor);
166 clusterCommunicator.addSubscriber(
167 HOST_REMOVED_MSG,
168 new InternalHostRemovedEventListener(), executor);
169 clusterCommunicator.addSubscriber(
170 HOST_ANTI_ENTROPY_ADVERTISEMENT,
171 new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
172
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700173 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800174 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700175 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700176
Madan Jampaniab994042014-10-13 15:34:04 -0700177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800182 executor.shutdownNow();
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800183 backgroundExecutor.shutdownNow();
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700184 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800185 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700186 log.error("Timeout during executor shutdown");
187 }
188 } catch (InterruptedException e) {
189 log.error("Error during executor shutdown", e);
190 }
191
192 hosts.clear();
193 removedHosts.clear();
194 locations.clear();
195 portAddresses.clear();
196
Madan Jampaniab994042014-10-13 15:34:04 -0700197 log.info("Stopped");
198 }
199
200 @Override
201 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
202 HostDescription hostDescription) {
203 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700204 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
205 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800206 log.debug("Notifying peers of a host topology event for providerId: "
Madan Jampani312a2982014-10-14 21:07:16 -0700207 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800208 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
Madan Jampani312a2982014-10-14 21:07:16 -0700209 }
210 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700211 }
212
213 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
214 HostDescription hostDescription, Timestamp timestamp) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800215 // If this host was previously removed, first ensure
216 // this new request is "newer"
217 if (isHostRemoved(hostId, timestamp)) {
218 log.debug("Ignoring update for removed host {}@{}",
219 hostDescription, timestamp);
220 return null;
221 }
Madan Jampaniab994042014-10-13 15:34:04 -0700222 StoredHost host = hosts.get(hostId);
223 if (host == null) {
224 return createHost(providerId, hostId, hostDescription, timestamp);
225 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800226 return updateHost(providerId, hostId, host, hostDescription, timestamp);
227 }
228
229 /**
230 * @param hostId host identifier
231 * @param timestamp timstamp to compare with
232 * @return true if given timestamp is more recent timestamp compared to
233 * the timestamp Host was removed.
234 */
235 private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
236 Timestamped<Host> removedInfo = removedHosts.get(hostId);
237 if (removedInfo != null) {
Jonathan Hart403ea932015-02-20 16:23:00 -0800238 if (removedInfo.isNewerThan(timestamp)) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800239 return true;
240 }
241 removedHosts.remove(hostId, removedInfo);
242 }
243 return false;
Madan Jampaniab994042014-10-13 15:34:04 -0700244 }
245
246 // creates a new host and sends HOST_ADDED
247 private HostEvent createHost(ProviderId providerId, HostId hostId,
248 HostDescription descr, Timestamp timestamp) {
249 synchronized (this) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800250 StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
Madan Jampaniab994042014-10-13 15:34:04 -0700251 descr.hwAddress(),
252 descr.vlan(),
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800253 descr.location(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700254 ImmutableSet.copyOf(descr.ipAddress()));
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800255 StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
256 if (concAdd != null) {
257 // concurrent add detected, retry from start
258 return updateHost(providerId, hostId, concAdd, descr, timestamp);
259 }
Madan Jampaniab994042014-10-13 15:34:04 -0700260 locations.put(descr.location(), newhost);
261 return new HostEvent(HOST_ADDED, newhost);
262 }
263 }
264
265 // checks for type of update to host, sends appropriate event
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800266 private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
Madan Jampaniab994042014-10-13 15:34:04 -0700267 HostDescription descr, Timestamp timestamp) {
Madan Jampaniab994042014-10-13 15:34:04 -0700268
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800269 if (timestamp.compareTo(oldHost.timestamp()) < 0) {
270 // new timestamp is older
271 log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700272 return null;
273 }
274
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800275 final boolean hostMoved = !oldHost.location().equals(descr.location());
276 if (hostMoved ||
277 !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
278 !descr.annotations().keys().isEmpty()) {
279
280 Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
281 addresses.addAll(descr.ipAddress());
282 Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
283 descr.annotations());
284
285 Timestamp newTimestamp = timestamp;
286 // if merged Set/Annotation differ from description...
287 final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
288 !AnnotationsUtil.isEqual(descr.annotations(), annotations);
289 if (deltaUpdate) {
290 // ..then local existing info had something description didn't
291 newTimestamp = hostClockService.getTimestamp(hostId);
292 log.debug("delta update detected on {}, substepping timestamp to {}",
293 hostId, newTimestamp);
294 }
295
296 StoredHost updated = new StoredHost(newTimestamp,
297 providerId, oldHost.id(),
298 oldHost.mac(), oldHost.vlan(),
299 descr.location(),
300 addresses,
301 annotations);
302 synchronized (this) {
303 boolean replaced = hosts.replace(hostId, oldHost, updated);
304 if (!replaced) {
305 // concurrent update, retry
306 return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
307 }
308 locations.remove(oldHost.location(), oldHost);
309 locations.put(updated.location(), updated);
310
311 HostEvent.Type eventType;
312 if (hostMoved) {
313 eventType = Type.HOST_MOVED;
314 } else {
315 eventType = Type.HOST_UPDATED;
316 }
317 return new HostEvent(eventType, updated);
318 }
Madan Jampaniab994042014-10-13 15:34:04 -0700319 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800320 return null;
Madan Jampaniab994042014-10-13 15:34:04 -0700321 }
322
323 @Override
324 public HostEvent removeHost(HostId hostId) {
325 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700326 HostEvent event = removeHostInternal(hostId, timestamp);
327 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800328 log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800329 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
Madan Jampani312a2982014-10-14 21:07:16 -0700330 }
331 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700332 }
333
334 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
335 synchronized (this) {
336 Host host = hosts.remove(hostId);
337 if (host != null) {
338 locations.remove((host.location()), host);
339 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
340 return new HostEvent(HOST_REMOVED, host);
341 }
342 return null;
343 }
344 }
345
346 @Override
347 public int getHostCount() {
348 return hosts.size();
349 }
350
351 @Override
352 public Iterable<Host> getHosts() {
353 return ImmutableSet.<Host>copyOf(hosts.values());
354 }
355
356 @Override
357 public Host getHost(HostId hostId) {
358 return hosts.get(hostId);
359 }
360
361 @Override
362 public Set<Host> getHosts(VlanId vlanId) {
363 Set<Host> vlanset = new HashSet<>();
364 for (Host h : hosts.values()) {
365 if (h.vlan().equals(vlanId)) {
366 vlanset.add(h);
367 }
368 }
369 return vlanset;
370 }
371
372 @Override
373 public Set<Host> getHosts(MacAddress mac) {
374 Set<Host> macset = new HashSet<>();
375 for (Host h : hosts.values()) {
376 if (h.mac().equals(mac)) {
377 macset.add(h);
378 }
379 }
380 return macset;
381 }
382
383 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700384 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700385 Set<Host> ipset = new HashSet<>();
386 for (Host h : hosts.values()) {
387 if (h.ipAddresses().contains(ip)) {
388 ipset.add(h);
389 }
390 }
391 return ipset;
392 }
393
394 @Override
395 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
396 return ImmutableSet.copyOf(locations.get(connectPoint));
397 }
398
399 @Override
400 public Set<Host> getConnectedHosts(DeviceId deviceId) {
401 Set<Host> hostset = new HashSet<>();
402 for (ConnectPoint p : locations.keySet()) {
403 if (p.deviceId().equals(deviceId)) {
404 hostset.addAll(locations.get(p));
405 }
406 }
407 return hostset;
408 }
409
410 @Override
411 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800412 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700413 }
414
415 @Override
416 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800417 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700418 }
419
420 @Override
421 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800422 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700423 }
424
425 @Override
426 public Set<PortAddresses> getAddressBindings() {
427 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800428 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700429 }
430 }
431
432 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800433 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700434 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800435 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700436
Jonathan Harta887ba82014-11-03 15:20:52 -0800437 if (addresses == null) {
438 return Collections.emptySet();
439 } else {
440 return ImmutableSet.copyOf(addresses);
441 }
Madan Jampaniab994042014-10-13 15:34:04 -0700442 }
Madan Jampaniab994042014-10-13 15:34:04 -0700443 }
444
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700445 private static final class StoredHost extends DefaultHost {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800446 private final Timestamp timestamp;
Madan Jampaniab994042014-10-13 15:34:04 -0700447
448 /**
449 * Creates an end-station host using the supplied information.
450 *
451 * @param providerId provider identity
452 * @param id host identifier
453 * @param mac host MAC address
454 * @param vlan host VLAN identifier
455 * @param location host location
456 * @param ips host IP addresses
457 * @param annotations optional key/value annotations
458 */
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800459 public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
460 MacAddress mac, VlanId vlan, HostLocation location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700461 Set<IpAddress> ips, Annotations... annotations) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800462 super(providerId, id, mac, vlan, location, ips, annotations);
463 this.timestamp = checkNotNull(timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700464 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700465
466 public Timestamp timestamp() {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800467 return timestamp;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700468 }
Madan Jampaniab994042014-10-13 15:34:04 -0700469 }
Madan Jampani312a2982014-10-14 21:07:16 -0700470
Jonathan Hart7d656f42015-01-27 14:07:23 -0800471 private void notifyPeers(InternalHostRemovedEvent event) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800472 broadcastMessage(HOST_REMOVED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700473 }
474
Jonathan Hart7d656f42015-01-27 14:07:23 -0800475 private void notifyPeers(InternalHostEvent event) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800476 broadcastMessage(HOST_UPDATED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700477 }
478
Jonathan Hart7d656f42015-01-27 14:07:23 -0800479 private void broadcastMessage(MessageSubject subject, Object event) {
Madan Jampani312a2982014-10-14 21:07:16 -0700480 ClusterMessage message = new ClusterMessage(
481 clusterService.getLocalNode().id(),
482 subject,
483 SERIALIZER.encode(event));
484 clusterCommunicator.broadcast(message);
485 }
Madan Jampani255618f2014-10-14 23:27:57 -0700486
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700487 private void unicastMessage(NodeId peer,
488 MessageSubject subject,
489 Object event) throws IOException {
490 ClusterMessage message = new ClusterMessage(
491 clusterService.getLocalNode().id(),
492 subject,
493 SERIALIZER.encode(event));
494 clusterCommunicator.unicast(message, peer);
495 }
496
Madan Jampani255618f2014-10-14 23:27:57 -0700497 private void notifyDelegateIfNotNull(HostEvent event) {
498 if (event != null) {
499 notifyDelegate(event);
500 }
501 }
502
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800503 private final class InternalHostEventListener
504 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700505 @Override
506 public void handle(ClusterMessage message) {
507
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800508 log.debug("Received host update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800509 InternalHostEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700510
511 ProviderId providerId = event.providerId();
512 HostId hostId = event.hostId();
513 HostDescription hostDescription = event.hostDescription();
514 Timestamp timestamp = event.timestamp();
515
Madan Jampani2af244a2015-02-22 13:12:01 -0800516 try {
517 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
518 hostId,
519 hostDescription,
520 timestamp));
521 } catch (Exception e) {
522 log.warn("Exception thrown handling host removed", e);
523 }
Madan Jampani255618f2014-10-14 23:27:57 -0700524 }
525 }
526
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800527 private final class InternalHostRemovedEventListener
528 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700529 @Override
530 public void handle(ClusterMessage message) {
531
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800532 log.debug("Received host removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800533 InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700534
535 HostId hostId = event.hostId();
536 Timestamp timestamp = event.timestamp();
537
Madan Jampani2af244a2015-02-22 13:12:01 -0800538 try {
539 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
540 } catch (Exception e) {
541 log.warn("Exception thrown handling host removed", e);
542 }
Madan Jampani255618f2014-10-14 23:27:57 -0700543 }
544 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700545
546 private final class SendAdvertisementTask implements Runnable {
547
548 @Override
549 public void run() {
550 if (Thread.currentThread().isInterrupted()) {
551 log.info("Interrupted, quitting");
552 return;
553 }
554
555 try {
556 final NodeId self = clusterService.getLocalNode().id();
557 Set<ControllerNode> nodes = clusterService.getNodes();
558
559 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
560 .transform(toNodeId())
561 .toList();
562
563 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800564 log.trace("No other peers in the cluster.");
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700565 return;
566 }
567
568 NodeId peer;
569 do {
570 int idx = RandomUtils.nextInt(0, nodeIds.size());
571 peer = nodeIds.get(idx);
572 } while (peer.equals(self));
573
574 HostAntiEntropyAdvertisement ad = createAdvertisement();
575
576 if (Thread.currentThread().isInterrupted()) {
577 log.info("Interrupted, quitting");
578 return;
579 }
580
581 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800582 unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700583 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700584 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700585 return;
586 }
587 } catch (Exception e) {
588 // catch all Exception to avoid Scheduled task being suppressed.
589 log.error("Exception thrown while sending advertisement", e);
590 }
591 }
592 }
593
594 private HostAntiEntropyAdvertisement createAdvertisement() {
595 final NodeId self = clusterService.getLocalNode().id();
596
597 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
598 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
599
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800600 hosts.forEach((hostId, hostInfo) -> {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700601 final ProviderId providerId = hostInfo.providerId();
602 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800603 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700604
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800605 removedHosts.forEach((hostId, timestamped) -> {
606 tombstones.put(hostId, timestamped.timestamp());
607 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700608
609 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
610 }
611
612 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
613
614 final NodeId sender = ad.sender();
615
616 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
617 // for each locally live Hosts...
618 final HostId hostId = host.getKey();
619 final StoredHost localHost = host.getValue();
620 final ProviderId providerId = localHost.providerId();
621 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
622 final Timestamp localLiveTimestamp = localHost.timestamp();
623
624 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
625 if (remoteTimestamp == null) {
626 remoteTimestamp = ad.tombstones().get(hostId);
627 }
628 if (remoteTimestamp == null ||
629 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
630
631 // local is more recent, push
632 // TODO: annotation is lost
633 final HostDescription desc = new DefaultHostDescription(
634 localHost.mac(),
635 localHost.vlan(),
636 localHost.location(),
637 localHost.ipAddresses());
638 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800639 unicastMessage(sender, HOST_UPDATED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700640 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
641 } catch (IOException e1) {
642 log.debug("Failed to send advertisement response", e1);
643 }
644 }
645
646 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
647 if (remoteDeadTimestamp != null &&
648 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
649 // sender has recent remove
650 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
651 }
652 }
653
654 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
655 // for each locally dead Hosts
656 final HostId hostId = dead.getKey();
657 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
658
659 // TODO: pick proper ProviderId, when supporting multi-provider
660 final ProviderId providerId = dead.getValue().value().providerId();
661 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
662
663 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
664 if (remoteLiveTimestamp != null &&
665 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
666 // sender has zombie, push
667 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800668 unicastMessage(sender, HOST_REMOVED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700669 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
670 } catch (IOException e1) {
671 log.debug("Failed to send advertisement response", e1);
672 }
673 }
674 }
675
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700676 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
677 // for each remote tombstone advertisement...
678 final HostId hostId = e.getKey();
679 final Timestamp adRemoveTimestamp = e.getValue();
680
681 final StoredHost storedHost = hosts.get(hostId);
682 if (storedHost == null) {
683 continue;
684 }
685 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
686 // sender has recent remove info, locally remove
687 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
688 }
689 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800690
691 // if remote ad has something unknown, actively sync
692 for (HostFragmentId key : ad.timestamps().keySet()) {
693 if (!hosts.containsKey(key.hostId())) {
694 HostAntiEntropyAdvertisement myAd = createAdvertisement();
695 try {
696 unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
697 break;
698 } catch (IOException e) {
699 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
700 }
701 }
702 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700703 }
704
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800705 private final class InternalHostAntiEntropyAdvertisementListener
706 implements ClusterMessageHandler {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700707
708 @Override
709 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800710 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700711 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800712 try {
713 handleAntiEntropyAdvertisement(advertisement);
714 } catch (Exception e) {
715 log.warn("Exception thrown handling Host advertisements", e);
716 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700717 }
718 }
Madan Jampaniab994042014-10-13 15:34:04 -0700719}