blob: b2e4b97cf357f6d71f7f6b92847da3250e3c38e9 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.host.impl;
Madan Jampaniab994042014-10-13 15:34:04 -070017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.collect.FluentIterable;
19import com.google.common.collect.HashMultimap;
20import com.google.common.collect.ImmutableList;
21import com.google.common.collect.ImmutableSet;
22import com.google.common.collect.Multimap;
23import com.google.common.collect.Multimaps;
24import com.google.common.collect.SetMultimap;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070025import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070026import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
Jonathan Hart7d656f42015-01-27 14:07:23 -080032import org.onlab.packet.IpAddress;
33import org.onlab.packet.MacAddress;
34import org.onlab.packet.VlanId;
35import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080036import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.ControllerNode;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.net.Annotations;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080040import org.onosproject.net.AnnotationsUtil;
Brian O'Connorabafb502014-12-02 22:26:20 -080041import org.onosproject.net.ConnectPoint;
42import org.onosproject.net.DefaultAnnotations;
43import org.onosproject.net.DefaultHost;
44import org.onosproject.net.DeviceId;
45import org.onosproject.net.Host;
46import org.onosproject.net.HostId;
47import org.onosproject.net.HostLocation;
48import org.onosproject.net.host.DefaultHostDescription;
49import org.onosproject.net.host.HostClockService;
50import org.onosproject.net.host.HostDescription;
51import org.onosproject.net.host.HostEvent;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080052import org.onosproject.net.host.HostEvent.Type;
Brian O'Connorabafb502014-12-02 22:26:20 -080053import org.onosproject.net.host.HostStore;
54import org.onosproject.net.host.HostStoreDelegate;
55import org.onosproject.net.host.PortAddresses;
56import org.onosproject.net.provider.ProviderId;
57import org.onosproject.store.AbstractStore;
58import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.impl.Timestamped;
64import org.onosproject.store.serializers.KryoSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070065import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Madan Jampaniab994042014-10-13 15:34:04 -070066import org.slf4j.Logger;
67
Jonathan Hart7d656f42015-01-27 14:07:23 -080068import java.io.IOException;
69import java.util.Collections;
70import java.util.HashMap;
71import java.util.HashSet;
72import java.util.Map;
73import java.util.Map.Entry;
74import java.util.Set;
75import java.util.concurrent.ConcurrentHashMap;
76import java.util.concurrent.ExecutorService;
Jonathan Hart7d656f42015-01-27 14:07:23 -080077import java.util.concurrent.ScheduledExecutorService;
78import java.util.concurrent.TimeUnit;
79
80import static com.google.common.base.Preconditions.checkNotNull;
81import static com.google.common.collect.Multimaps.newSetMultimap;
82import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
83import static com.google.common.collect.Sets.newConcurrentHashSet;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080084import static java.util.concurrent.Executors.newCachedThreadPool;
Jonathan Hart7d656f42015-01-27 14:07:23 -080085import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080086import static org.onlab.util.Tools.groupedThreads;
Jonathan Hart7d656f42015-01-27 14:07:23 -080087import static org.onlab.util.Tools.minPriority;
Jonathan Hart7d656f42015-01-27 14:07:23 -080088import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
89import static org.onosproject.net.DefaultAnnotations.merge;
90import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
91import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080092import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
Jonathan Hart7d656f42015-01-27 14:07:23 -080093import static org.slf4j.LoggerFactory.getLogger;
Madan Jampaniab994042014-10-13 15:34:04 -070094
Madan Jampaniab994042014-10-13 15:34:04 -070095/**
96 * Manages inventory of end-station hosts in distributed data store
97 * that uses optimistic replication and gossip based techniques.
98 */
99@Component(immediate = true)
100@Service
101public class GossipHostStore
102 extends AbstractStore<HostEvent, HostStoreDelegate>
103 implements HostStore {
104
105 private final Logger log = getLogger(getClass());
106
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800107 // TODO: make this configurable
108 private int hostsExpected = 2000000;
Madan Jampaniab994042014-10-13 15:34:04 -0700109
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800110 // Host inventory
111 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
112
113 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
Madan Jampaniab994042014-10-13 15:34:04 -0700114
115 // Hosts tracked by their location
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800116 private final Multimap<ConnectPoint, Host> locations
117 = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
118 () -> newConcurrentHashSet()));
Madan Jampaniab994042014-10-13 15:34:04 -0700119
Jonathan Harta887ba82014-11-03 15:20:52 -0800120 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
121 Multimaps.synchronizedSetMultimap(
122 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected HostClockService hostClockService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ClusterCommunicationService clusterCommunicator;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterService clusterService;
132
Madan Jampani312a2982014-10-14 21:07:16 -0700133 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
134 @Override
135 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700136 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800137 .register(DistributedStoreSerializers.STORE_COMMON)
138 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700139 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700140 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700141 .register(HostFragmentId.class)
142 .register(HostAntiEntropyAdvertisement.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800143 .build();
Madan Jampani312a2982014-10-14 21:07:16 -0700144 }
145 };
146
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800147 private ExecutorService executor;
148
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800149 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700150
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800151 // TODO: Make these anti-entropy params configurable
152 private long initialDelaySec = 5;
153 private long periodSec = 5;
154
Madan Jampaniab994042014-10-13 15:34:04 -0700155 @Activate
156 public void activate() {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700157
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800158 executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800159
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800160 backgroundExecutor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800161 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700162
Madan Jampani2af244a2015-02-22 13:12:01 -0800163 clusterCommunicator.addSubscriber(
164 HOST_UPDATED_MSG,
165 new InternalHostEventListener(), executor);
166 clusterCommunicator.addSubscriber(
167 HOST_REMOVED_MSG,
168 new InternalHostRemovedEventListener(), executor);
169 clusterCommunicator.addSubscriber(
170 HOST_ANTI_ENTROPY_ADVERTISEMENT,
171 new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
172
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700173 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800174 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700175 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700176
Madan Jampaniab994042014-10-13 15:34:04 -0700177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800182 executor.shutdownNow();
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800183 backgroundExecutor.shutdownNow();
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700184 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800185 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700186 log.error("Timeout during executor shutdown");
187 }
188 } catch (InterruptedException e) {
189 log.error("Error during executor shutdown", e);
190 }
191
192 hosts.clear();
193 removedHosts.clear();
194 locations.clear();
195 portAddresses.clear();
196
Madan Jampaniab994042014-10-13 15:34:04 -0700197 log.info("Stopped");
198 }
199
200 @Override
201 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
202 HostDescription hostDescription) {
203 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700204 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
205 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800206 log.debug("Notifying peers of a host topology event for providerId: "
Madan Jampani312a2982014-10-14 21:07:16 -0700207 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800208 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
Madan Jampani312a2982014-10-14 21:07:16 -0700209 }
210 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700211 }
212
213 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
214 HostDescription hostDescription, Timestamp timestamp) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800215 // If this host was previously removed, first ensure
216 // this new request is "newer"
217 if (isHostRemoved(hostId, timestamp)) {
218 log.debug("Ignoring update for removed host {}@{}",
219 hostDescription, timestamp);
220 return null;
221 }
Madan Jampaniab994042014-10-13 15:34:04 -0700222 StoredHost host = hosts.get(hostId);
223 if (host == null) {
224 return createHost(providerId, hostId, hostDescription, timestamp);
225 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800226 return updateHost(providerId, hostId, host, hostDescription, timestamp);
227 }
228
229 /**
230 * @param hostId host identifier
231 * @param timestamp timstamp to compare with
232 * @return true if given timestamp is more recent timestamp compared to
233 * the timestamp Host was removed.
234 */
235 private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
236 Timestamped<Host> removedInfo = removedHosts.get(hostId);
237 if (removedInfo != null) {
Jonathan Hart403ea932015-02-20 16:23:00 -0800238 if (removedInfo.isNewerThan(timestamp)) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800239 return true;
240 }
241 removedHosts.remove(hostId, removedInfo);
242 }
243 return false;
Madan Jampaniab994042014-10-13 15:34:04 -0700244 }
245
246 // creates a new host and sends HOST_ADDED
247 private HostEvent createHost(ProviderId providerId, HostId hostId,
248 HostDescription descr, Timestamp timestamp) {
249 synchronized (this) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800250 StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
Madan Jampaniab994042014-10-13 15:34:04 -0700251 descr.hwAddress(),
252 descr.vlan(),
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800253 descr.location(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700254 ImmutableSet.copyOf(descr.ipAddress()));
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800255 StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
256 if (concAdd != null) {
257 // concurrent add detected, retry from start
258 return updateHost(providerId, hostId, concAdd, descr, timestamp);
259 }
Madan Jampaniab994042014-10-13 15:34:04 -0700260 locations.put(descr.location(), newhost);
261 return new HostEvent(HOST_ADDED, newhost);
262 }
263 }
264
265 // checks for type of update to host, sends appropriate event
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800266 private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
Madan Jampaniab994042014-10-13 15:34:04 -0700267 HostDescription descr, Timestamp timestamp) {
Madan Jampaniab994042014-10-13 15:34:04 -0700268
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800269 if (timestamp.compareTo(oldHost.timestamp()) < 0) {
270 // new timestamp is older
271 log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700272 return null;
273 }
274
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800275 final boolean hostMoved = !oldHost.location().equals(descr.location());
276 if (hostMoved ||
277 !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
278 !descr.annotations().keys().isEmpty()) {
279
280 Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
281 addresses.addAll(descr.ipAddress());
282 Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
283 descr.annotations());
284
285 Timestamp newTimestamp = timestamp;
286 // if merged Set/Annotation differ from description...
287 final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
288 !AnnotationsUtil.isEqual(descr.annotations(), annotations);
289 if (deltaUpdate) {
290 // ..then local existing info had something description didn't
291 newTimestamp = hostClockService.getTimestamp(hostId);
292 log.debug("delta update detected on {}, substepping timestamp to {}",
293 hostId, newTimestamp);
294 }
295
296 StoredHost updated = new StoredHost(newTimestamp,
297 providerId, oldHost.id(),
298 oldHost.mac(), oldHost.vlan(),
299 descr.location(),
300 addresses,
301 annotations);
302 synchronized (this) {
303 boolean replaced = hosts.replace(hostId, oldHost, updated);
304 if (!replaced) {
305 // concurrent update, retry
306 return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
307 }
308 locations.remove(oldHost.location(), oldHost);
309 locations.put(updated.location(), updated);
310
311 HostEvent.Type eventType;
312 if (hostMoved) {
313 eventType = Type.HOST_MOVED;
314 } else {
315 eventType = Type.HOST_UPDATED;
316 }
317 return new HostEvent(eventType, updated);
318 }
Madan Jampaniab994042014-10-13 15:34:04 -0700319 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800320 return null;
Madan Jampaniab994042014-10-13 15:34:04 -0700321 }
322
323 @Override
324 public HostEvent removeHost(HostId hostId) {
325 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700326 HostEvent event = removeHostInternal(hostId, timestamp);
327 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800328 log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800329 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
Madan Jampani312a2982014-10-14 21:07:16 -0700330 }
331 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700332 }
333
334 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
335 synchronized (this) {
336 Host host = hosts.remove(hostId);
337 if (host != null) {
338 locations.remove((host.location()), host);
339 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
340 return new HostEvent(HOST_REMOVED, host);
341 }
342 return null;
343 }
344 }
345
346 @Override
347 public int getHostCount() {
348 return hosts.size();
349 }
350
351 @Override
352 public Iterable<Host> getHosts() {
353 return ImmutableSet.<Host>copyOf(hosts.values());
354 }
355
356 @Override
357 public Host getHost(HostId hostId) {
358 return hosts.get(hostId);
359 }
360
361 @Override
362 public Set<Host> getHosts(VlanId vlanId) {
363 Set<Host> vlanset = new HashSet<>();
364 for (Host h : hosts.values()) {
365 if (h.vlan().equals(vlanId)) {
366 vlanset.add(h);
367 }
368 }
369 return vlanset;
370 }
371
372 @Override
373 public Set<Host> getHosts(MacAddress mac) {
374 Set<Host> macset = new HashSet<>();
375 for (Host h : hosts.values()) {
376 if (h.mac().equals(mac)) {
377 macset.add(h);
378 }
379 }
380 return macset;
381 }
382
383 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700384 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700385 Set<Host> ipset = new HashSet<>();
386 for (Host h : hosts.values()) {
387 if (h.ipAddresses().contains(ip)) {
388 ipset.add(h);
389 }
390 }
391 return ipset;
392 }
393
394 @Override
395 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
396 return ImmutableSet.copyOf(locations.get(connectPoint));
397 }
398
399 @Override
400 public Set<Host> getConnectedHosts(DeviceId deviceId) {
401 Set<Host> hostset = new HashSet<>();
402 for (ConnectPoint p : locations.keySet()) {
403 if (p.deviceId().equals(deviceId)) {
404 hostset.addAll(locations.get(p));
405 }
406 }
407 return hostset;
408 }
409
410 @Override
411 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800412 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700413 }
414
415 @Override
416 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800417 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700418 }
419
420 @Override
421 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800422 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700423 }
424
425 @Override
426 public Set<PortAddresses> getAddressBindings() {
427 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800428 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700429 }
430 }
431
432 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800433 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700434 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800435 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700436
Jonathan Harta887ba82014-11-03 15:20:52 -0800437 if (addresses == null) {
438 return Collections.emptySet();
439 } else {
440 return ImmutableSet.copyOf(addresses);
441 }
Madan Jampaniab994042014-10-13 15:34:04 -0700442 }
Madan Jampaniab994042014-10-13 15:34:04 -0700443 }
444
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700445 private static final class StoredHost extends DefaultHost {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800446 private final Timestamp timestamp;
Madan Jampaniab994042014-10-13 15:34:04 -0700447
448 /**
449 * Creates an end-station host using the supplied information.
450 *
451 * @param providerId provider identity
452 * @param id host identifier
453 * @param mac host MAC address
454 * @param vlan host VLAN identifier
455 * @param location host location
456 * @param ips host IP addresses
457 * @param annotations optional key/value annotations
458 */
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800459 public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
460 MacAddress mac, VlanId vlan, HostLocation location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700461 Set<IpAddress> ips, Annotations... annotations) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800462 super(providerId, id, mac, vlan, location, ips, annotations);
463 this.timestamp = checkNotNull(timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700464 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700465
466 public Timestamp timestamp() {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800467 return timestamp;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700468 }
Madan Jampaniab994042014-10-13 15:34:04 -0700469 }
Madan Jampani312a2982014-10-14 21:07:16 -0700470
Jonathan Hart7d656f42015-01-27 14:07:23 -0800471 private void notifyPeers(InternalHostRemovedEvent event) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800472 broadcastMessage(HOST_REMOVED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700473 }
474
Jonathan Hart7d656f42015-01-27 14:07:23 -0800475 private void notifyPeers(InternalHostEvent event) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800476 broadcastMessage(HOST_UPDATED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700477 }
478
Jonathan Hart7d656f42015-01-27 14:07:23 -0800479 private void broadcastMessage(MessageSubject subject, Object event) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700480 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
Madan Jampani312a2982014-10-14 21:07:16 -0700481 }
Madan Jampani255618f2014-10-14 23:27:57 -0700482
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700483 private void unicastMessage(NodeId peer,
484 MessageSubject subject,
485 Object event) throws IOException {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700486 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700487 }
488
Madan Jampani255618f2014-10-14 23:27:57 -0700489 private void notifyDelegateIfNotNull(HostEvent event) {
490 if (event != null) {
491 notifyDelegate(event);
492 }
493 }
494
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800495 private final class InternalHostEventListener
496 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700497 @Override
498 public void handle(ClusterMessage message) {
499
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800500 log.debug("Received host update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800501 InternalHostEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700502
503 ProviderId providerId = event.providerId();
504 HostId hostId = event.hostId();
505 HostDescription hostDescription = event.hostDescription();
506 Timestamp timestamp = event.timestamp();
507
Madan Jampani2af244a2015-02-22 13:12:01 -0800508 try {
509 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
510 hostId,
511 hostDescription,
512 timestamp));
513 } catch (Exception e) {
514 log.warn("Exception thrown handling host removed", e);
515 }
Madan Jampani255618f2014-10-14 23:27:57 -0700516 }
517 }
518
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800519 private final class InternalHostRemovedEventListener
520 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700521 @Override
522 public void handle(ClusterMessage message) {
523
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800524 log.debug("Received host removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800525 InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700526
527 HostId hostId = event.hostId();
528 Timestamp timestamp = event.timestamp();
529
Madan Jampani2af244a2015-02-22 13:12:01 -0800530 try {
531 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
532 } catch (Exception e) {
533 log.warn("Exception thrown handling host removed", e);
534 }
Madan Jampani255618f2014-10-14 23:27:57 -0700535 }
536 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700537
538 private final class SendAdvertisementTask implements Runnable {
539
540 @Override
541 public void run() {
542 if (Thread.currentThread().isInterrupted()) {
543 log.info("Interrupted, quitting");
544 return;
545 }
546
547 try {
548 final NodeId self = clusterService.getLocalNode().id();
549 Set<ControllerNode> nodes = clusterService.getNodes();
550
551 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
552 .transform(toNodeId())
553 .toList();
554
555 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800556 log.trace("No other peers in the cluster.");
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700557 return;
558 }
559
560 NodeId peer;
561 do {
562 int idx = RandomUtils.nextInt(0, nodeIds.size());
563 peer = nodeIds.get(idx);
564 } while (peer.equals(self));
565
566 HostAntiEntropyAdvertisement ad = createAdvertisement();
567
568 if (Thread.currentThread().isInterrupted()) {
569 log.info("Interrupted, quitting");
570 return;
571 }
572
573 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800574 unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700575 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700576 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700577 return;
578 }
579 } catch (Exception e) {
580 // catch all Exception to avoid Scheduled task being suppressed.
581 log.error("Exception thrown while sending advertisement", e);
582 }
583 }
584 }
585
586 private HostAntiEntropyAdvertisement createAdvertisement() {
587 final NodeId self = clusterService.getLocalNode().id();
588
589 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
590 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
591
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800592 hosts.forEach((hostId, hostInfo) -> {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700593 final ProviderId providerId = hostInfo.providerId();
594 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800595 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700596
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800597 removedHosts.forEach((hostId, timestamped) -> {
598 tombstones.put(hostId, timestamped.timestamp());
599 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700600
601 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
602 }
603
604 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
605
606 final NodeId sender = ad.sender();
607
608 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
609 // for each locally live Hosts...
610 final HostId hostId = host.getKey();
611 final StoredHost localHost = host.getValue();
612 final ProviderId providerId = localHost.providerId();
613 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
614 final Timestamp localLiveTimestamp = localHost.timestamp();
615
616 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
617 if (remoteTimestamp == null) {
618 remoteTimestamp = ad.tombstones().get(hostId);
619 }
620 if (remoteTimestamp == null ||
621 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
622
623 // local is more recent, push
624 // TODO: annotation is lost
625 final HostDescription desc = new DefaultHostDescription(
626 localHost.mac(),
627 localHost.vlan(),
628 localHost.location(),
629 localHost.ipAddresses());
630 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800631 unicastMessage(sender, HOST_UPDATED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700632 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
633 } catch (IOException e1) {
634 log.debug("Failed to send advertisement response", e1);
635 }
636 }
637
638 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
639 if (remoteDeadTimestamp != null &&
640 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
641 // sender has recent remove
642 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
643 }
644 }
645
646 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
647 // for each locally dead Hosts
648 final HostId hostId = dead.getKey();
649 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
650
651 // TODO: pick proper ProviderId, when supporting multi-provider
652 final ProviderId providerId = dead.getValue().value().providerId();
653 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
654
655 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
656 if (remoteLiveTimestamp != null &&
657 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
658 // sender has zombie, push
659 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800660 unicastMessage(sender, HOST_REMOVED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700661 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
662 } catch (IOException e1) {
663 log.debug("Failed to send advertisement response", e1);
664 }
665 }
666 }
667
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700668 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
669 // for each remote tombstone advertisement...
670 final HostId hostId = e.getKey();
671 final Timestamp adRemoveTimestamp = e.getValue();
672
673 final StoredHost storedHost = hosts.get(hostId);
674 if (storedHost == null) {
675 continue;
676 }
677 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
678 // sender has recent remove info, locally remove
679 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
680 }
681 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800682
683 // if remote ad has something unknown, actively sync
684 for (HostFragmentId key : ad.timestamps().keySet()) {
685 if (!hosts.containsKey(key.hostId())) {
686 HostAntiEntropyAdvertisement myAd = createAdvertisement();
687 try {
688 unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
689 break;
690 } catch (IOException e) {
691 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
692 }
693 }
694 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700695 }
696
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800697 private final class InternalHostAntiEntropyAdvertisementListener
698 implements ClusterMessageHandler {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700699
700 @Override
701 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800702 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700703 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800704 try {
705 handleAntiEntropyAdvertisement(advertisement);
706 } catch (Exception e) {
707 log.warn("Exception thrown handling Host advertisements", e);
708 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700709 }
710 }
Madan Jampaniab994042014-10-13 15:34:04 -0700711}