blob: ed838280009c148a3931d563e75f9201a96bf5be [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.host.impl;
Madan Jampaniab994042014-10-13 15:34:04 -070017
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080018import static com.google.common.base.Preconditions.checkNotNull;
19import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
20import static com.google.common.collect.Multimaps.newSetMultimap;
21import static com.google.common.collect.Sets.newConcurrentHashSet;
Jonathan Harta887ba82014-11-03 15:20:52 -080022import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Brian O'Connorabafb502014-12-02 22:26:20 -080023import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
24import static org.onosproject.net.DefaultAnnotations.merge;
25import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
Brian O'Connorabafb502014-12-02 22:26:20 -080026import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080027import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
Jonathan Harta887ba82014-11-03 15:20:52 -080028import static org.onlab.util.Tools.namedThreads;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080029import static org.onlab.util.Tools.minPriority;
Jonathan Harta887ba82014-11-03 15:20:52 -080030import static org.slf4j.LoggerFactory.getLogger;
31
32import java.io.IOException;
33import java.util.Collections;
34import java.util.HashMap;
35import java.util.HashSet;
36import java.util.Map;
37import java.util.Map.Entry;
38import java.util.Set;
39import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHI80d56592014-11-25 15:11:13 -080040import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
Jonathan Harta887ba82014-11-03 15:20:52 -080042import java.util.concurrent.ScheduledExecutorService;
43import java.util.concurrent.TimeUnit;
Madan Jampaniab994042014-10-13 15:34:04 -070044
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -070045import org.apache.commons.lang3.RandomUtils;
Madan Jampaniab994042014-10-13 15:34:04 -070046import org.apache.felix.scr.annotations.Activate;
47import org.apache.felix.scr.annotations.Component;
48import org.apache.felix.scr.annotations.Deactivate;
49import org.apache.felix.scr.annotations.Reference;
50import org.apache.felix.scr.annotations.ReferenceCardinality;
51import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080052import org.onosproject.cluster.ClusterService;
53import org.onosproject.cluster.ControllerNode;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.net.Annotations;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080056import org.onosproject.net.AnnotationsUtil;
Brian O'Connorabafb502014-12-02 22:26:20 -080057import org.onosproject.net.ConnectPoint;
58import org.onosproject.net.DefaultAnnotations;
59import org.onosproject.net.DefaultHost;
60import org.onosproject.net.DeviceId;
61import org.onosproject.net.Host;
62import org.onosproject.net.HostId;
63import org.onosproject.net.HostLocation;
64import org.onosproject.net.host.DefaultHostDescription;
65import org.onosproject.net.host.HostClockService;
66import org.onosproject.net.host.HostDescription;
67import org.onosproject.net.host.HostEvent;
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -080068import org.onosproject.net.host.HostEvent.Type;
Brian O'Connorabafb502014-12-02 22:26:20 -080069import org.onosproject.net.host.HostStore;
70import org.onosproject.net.host.HostStoreDelegate;
71import org.onosproject.net.host.PortAddresses;
72import org.onosproject.net.provider.ProviderId;
73import org.onosproject.store.AbstractStore;
74import org.onosproject.store.Timestamp;
75import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
76import org.onosproject.store.cluster.messaging.ClusterMessage;
77import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
78import org.onosproject.store.cluster.messaging.MessageSubject;
79import org.onosproject.store.impl.Timestamped;
80import org.onosproject.store.serializers.KryoSerializer;
81import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -070082import org.onlab.packet.IpAddress;
Madan Jampaniab994042014-10-13 15:34:04 -070083import org.onlab.packet.MacAddress;
84import org.onlab.packet.VlanId;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070085import org.onlab.util.KryoNamespace;
Madan Jampaniab994042014-10-13 15:34:04 -070086import org.slf4j.Logger;
87
Jonathan Harta887ba82014-11-03 15:20:52 -080088import com.google.common.collect.FluentIterable;
89import com.google.common.collect.HashMultimap;
90import com.google.common.collect.ImmutableList;
91import com.google.common.collect.ImmutableSet;
92import com.google.common.collect.Multimap;
93import com.google.common.collect.Multimaps;
94import com.google.common.collect.SetMultimap;
Madan Jampaniab994042014-10-13 15:34:04 -070095
Madan Jampaniab994042014-10-13 15:34:04 -070096/**
97 * Manages inventory of end-station hosts in distributed data store
98 * that uses optimistic replication and gossip based techniques.
99 */
100@Component(immediate = true)
101@Service
102public class GossipHostStore
103 extends AbstractStore<HostEvent, HostStoreDelegate>
104 implements HostStore {
105
106 private final Logger log = getLogger(getClass());
107
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800108 // TODO: make this configurable
109 private int hostsExpected = 2000000;
Madan Jampaniab994042014-10-13 15:34:04 -0700110
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800111 // Host inventory
112 private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
113
114 private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
Madan Jampaniab994042014-10-13 15:34:04 -0700115
116 // Hosts tracked by their location
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800117 private final Multimap<ConnectPoint, Host> locations
118 = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
119 () -> newConcurrentHashSet()));
Madan Jampaniab994042014-10-13 15:34:04 -0700120
Jonathan Harta887ba82014-11-03 15:20:52 -0800121 private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
122 Multimaps.synchronizedSetMultimap(
123 HashMultimap.<ConnectPoint, PortAddresses>create());
Madan Jampaniab994042014-10-13 15:34:04 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
126 protected HostClockService hostClockService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected ClusterCommunicationService clusterCommunicator;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected ClusterService clusterService;
133
Madan Jampani312a2982014-10-14 21:07:16 -0700134 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
135 @Override
136 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700137 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800138 .register(DistributedStoreSerializers.STORE_COMMON)
139 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700140 .register(InternalHostEvent.class)
Madan Jampani312a2982014-10-14 21:07:16 -0700141 .register(InternalHostRemovedEvent.class)
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700142 .register(HostFragmentId.class)
143 .register(HostAntiEntropyAdvertisement.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800144 .build();
Madan Jampani312a2982014-10-14 21:07:16 -0700145 }
146 };
147
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800148 private ExecutorService executor;
149
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800150 private ScheduledExecutorService backgroundExecutor;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700151
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800152 // TODO: Make these anti-entropy params configurable
153 private long initialDelaySec = 5;
154 private long periodSec = 5;
155
Madan Jampaniab994042014-10-13 15:34:04 -0700156 @Activate
157 public void activate() {
Madan Jampani255618f2014-10-14 23:27:57 -0700158 clusterCommunicator.addSubscriber(
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800159 HOST_UPDATED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700160 new InternalHostEventListener());
Madan Jampani255618f2014-10-14 23:27:57 -0700161 clusterCommunicator.addSubscriber(
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800162 HOST_REMOVED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700163 new InternalHostRemovedEventListener());
164 clusterCommunicator.addSubscriber(
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800165 HOST_ANTI_ENTROPY_ADVERTISEMENT,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700166 new InternalHostAntiEntropyAdvertisementListener());
167
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800168 executor = Executors.newCachedThreadPool(namedThreads("onos-host-fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800169
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800170 backgroundExecutor =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800171 newSingleThreadScheduledExecutor(minPriority(namedThreads("onos-host-bg-%d")));
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700172
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700173 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800174 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700175 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani255618f2014-10-14 23:27:57 -0700176
Madan Jampaniab994042014-10-13 15:34:04 -0700177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800182 executor.shutdownNow();
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800183 backgroundExecutor.shutdownNow();
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700184 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800185 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700186 log.error("Timeout during executor shutdown");
187 }
188 } catch (InterruptedException e) {
189 log.error("Error during executor shutdown", e);
190 }
191
192 hosts.clear();
193 removedHosts.clear();
194 locations.clear();
195 portAddresses.clear();
196
Madan Jampaniab994042014-10-13 15:34:04 -0700197 log.info("Stopped");
198 }
199
200 @Override
201 public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
202 HostDescription hostDescription) {
203 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700204 HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
205 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800206 log.debug("Notifying peers of a host topology event for providerId: "
Madan Jampani312a2982014-10-14 21:07:16 -0700207 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
208 try {
209 notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
210 } catch (IOException e) {
211 log.error("Failed to notify peers of a host topology event for providerId: "
212 + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
213 }
214 }
215 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700216 }
217
218 private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
219 HostDescription hostDescription, Timestamp timestamp) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800220 // If this host was previously removed, first ensure
221 // this new request is "newer"
222 if (isHostRemoved(hostId, timestamp)) {
223 log.debug("Ignoring update for removed host {}@{}",
224 hostDescription, timestamp);
225 return null;
226 }
Madan Jampaniab994042014-10-13 15:34:04 -0700227 StoredHost host = hosts.get(hostId);
228 if (host == null) {
229 return createHost(providerId, hostId, hostDescription, timestamp);
230 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800231 return updateHost(providerId, hostId, host, hostDescription, timestamp);
232 }
233
234 /**
235 * @param hostId host identifier
236 * @param timestamp timstamp to compare with
237 * @return true if given timestamp is more recent timestamp compared to
238 * the timestamp Host was removed.
239 */
240 private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
241 Timestamped<Host> removedInfo = removedHosts.get(hostId);
242 if (removedInfo != null) {
243 if (removedInfo.isNewer(timestamp)) {
244 return true;
245 }
246 removedHosts.remove(hostId, removedInfo);
247 }
248 return false;
Madan Jampaniab994042014-10-13 15:34:04 -0700249 }
250
251 // creates a new host and sends HOST_ADDED
252 private HostEvent createHost(ProviderId providerId, HostId hostId,
253 HostDescription descr, Timestamp timestamp) {
254 synchronized (this) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800255 StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
Madan Jampaniab994042014-10-13 15:34:04 -0700256 descr.hwAddress(),
257 descr.vlan(),
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800258 descr.location(),
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700259 ImmutableSet.copyOf(descr.ipAddress()));
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800260 StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
261 if (concAdd != null) {
262 // concurrent add detected, retry from start
263 return updateHost(providerId, hostId, concAdd, descr, timestamp);
264 }
Madan Jampaniab994042014-10-13 15:34:04 -0700265 locations.put(descr.location(), newhost);
266 return new HostEvent(HOST_ADDED, newhost);
267 }
268 }
269
270 // checks for type of update to host, sends appropriate event
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800271 private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
Madan Jampaniab994042014-10-13 15:34:04 -0700272 HostDescription descr, Timestamp timestamp) {
Madan Jampaniab994042014-10-13 15:34:04 -0700273
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800274 if (timestamp.compareTo(oldHost.timestamp()) < 0) {
275 // new timestamp is older
276 log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700277 return null;
278 }
279
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800280 final boolean hostMoved = !oldHost.location().equals(descr.location());
281 if (hostMoved ||
282 !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
283 !descr.annotations().keys().isEmpty()) {
284
285 Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
286 addresses.addAll(descr.ipAddress());
287 Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
288 descr.annotations());
289
290 Timestamp newTimestamp = timestamp;
291 // if merged Set/Annotation differ from description...
292 final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
293 !AnnotationsUtil.isEqual(descr.annotations(), annotations);
294 if (deltaUpdate) {
295 // ..then local existing info had something description didn't
296 newTimestamp = hostClockService.getTimestamp(hostId);
297 log.debug("delta update detected on {}, substepping timestamp to {}",
298 hostId, newTimestamp);
299 }
300
301 StoredHost updated = new StoredHost(newTimestamp,
302 providerId, oldHost.id(),
303 oldHost.mac(), oldHost.vlan(),
304 descr.location(),
305 addresses,
306 annotations);
307 synchronized (this) {
308 boolean replaced = hosts.replace(hostId, oldHost, updated);
309 if (!replaced) {
310 // concurrent update, retry
311 return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
312 }
313 locations.remove(oldHost.location(), oldHost);
314 locations.put(updated.location(), updated);
315
316 HostEvent.Type eventType;
317 if (hostMoved) {
318 eventType = Type.HOST_MOVED;
319 } else {
320 eventType = Type.HOST_UPDATED;
321 }
322 return new HostEvent(eventType, updated);
323 }
Madan Jampaniab994042014-10-13 15:34:04 -0700324 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800325 return null;
Madan Jampaniab994042014-10-13 15:34:04 -0700326 }
327
328 @Override
329 public HostEvent removeHost(HostId hostId) {
330 Timestamp timestamp = hostClockService.getTimestamp(hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700331 HostEvent event = removeHostInternal(hostId, timestamp);
332 if (event != null) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800333 log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
Madan Jampani312a2982014-10-14 21:07:16 -0700334 try {
335 notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
336 } catch (IOException e) {
337 log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
338 }
339 }
340 return event;
Madan Jampaniab994042014-10-13 15:34:04 -0700341 }
342
343 private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
344 synchronized (this) {
345 Host host = hosts.remove(hostId);
346 if (host != null) {
347 locations.remove((host.location()), host);
348 removedHosts.put(hostId, new Timestamped<>(host, timestamp));
349 return new HostEvent(HOST_REMOVED, host);
350 }
351 return null;
352 }
353 }
354
355 @Override
356 public int getHostCount() {
357 return hosts.size();
358 }
359
360 @Override
361 public Iterable<Host> getHosts() {
362 return ImmutableSet.<Host>copyOf(hosts.values());
363 }
364
365 @Override
366 public Host getHost(HostId hostId) {
367 return hosts.get(hostId);
368 }
369
370 @Override
371 public Set<Host> getHosts(VlanId vlanId) {
372 Set<Host> vlanset = new HashSet<>();
373 for (Host h : hosts.values()) {
374 if (h.vlan().equals(vlanId)) {
375 vlanset.add(h);
376 }
377 }
378 return vlanset;
379 }
380
381 @Override
382 public Set<Host> getHosts(MacAddress mac) {
383 Set<Host> macset = new HashSet<>();
384 for (Host h : hosts.values()) {
385 if (h.mac().equals(mac)) {
386 macset.add(h);
387 }
388 }
389 return macset;
390 }
391
392 @Override
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700393 public Set<Host> getHosts(IpAddress ip) {
Madan Jampaniab994042014-10-13 15:34:04 -0700394 Set<Host> ipset = new HashSet<>();
395 for (Host h : hosts.values()) {
396 if (h.ipAddresses().contains(ip)) {
397 ipset.add(h);
398 }
399 }
400 return ipset;
401 }
402
403 @Override
404 public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
405 return ImmutableSet.copyOf(locations.get(connectPoint));
406 }
407
408 @Override
409 public Set<Host> getConnectedHosts(DeviceId deviceId) {
410 Set<Host> hostset = new HashSet<>();
411 for (ConnectPoint p : locations.keySet()) {
412 if (p.deviceId().equals(deviceId)) {
413 hostset.addAll(locations.get(p));
414 }
415 }
416 return hostset;
417 }
418
419 @Override
420 public void updateAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800421 portAddresses.put(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700422 }
423
424 @Override
425 public void removeAddressBindings(PortAddresses addresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800426 portAddresses.remove(addresses.connectPoint(), addresses);
Madan Jampaniab994042014-10-13 15:34:04 -0700427 }
428
429 @Override
430 public void clearAddressBindings(ConnectPoint connectPoint) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800431 portAddresses.removeAll(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700432 }
433
434 @Override
435 public Set<PortAddresses> getAddressBindings() {
436 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800437 return ImmutableSet.copyOf(portAddresses.values());
Madan Jampaniab994042014-10-13 15:34:04 -0700438 }
439 }
440
441 @Override
Jonathan Harta887ba82014-11-03 15:20:52 -0800442 public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
Madan Jampaniab994042014-10-13 15:34:04 -0700443 synchronized (portAddresses) {
Jonathan Harta887ba82014-11-03 15:20:52 -0800444 Set<PortAddresses> addresses = portAddresses.get(connectPoint);
Madan Jampaniab994042014-10-13 15:34:04 -0700445
Jonathan Harta887ba82014-11-03 15:20:52 -0800446 if (addresses == null) {
447 return Collections.emptySet();
448 } else {
449 return ImmutableSet.copyOf(addresses);
450 }
Madan Jampaniab994042014-10-13 15:34:04 -0700451 }
Madan Jampaniab994042014-10-13 15:34:04 -0700452 }
453
Yuta HIGUCHIe5ca93b2014-10-23 09:49:00 -0700454 private static final class StoredHost extends DefaultHost {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800455 private final Timestamp timestamp;
Madan Jampaniab994042014-10-13 15:34:04 -0700456
457 /**
458 * Creates an end-station host using the supplied information.
459 *
460 * @param providerId provider identity
461 * @param id host identifier
462 * @param mac host MAC address
463 * @param vlan host VLAN identifier
464 * @param location host location
465 * @param ips host IP addresses
466 * @param annotations optional key/value annotations
467 */
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800468 public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
469 MacAddress mac, VlanId vlan, HostLocation location,
Pavlin Radoslavov33f228a2014-10-27 19:33:16 -0700470 Set<IpAddress> ips, Annotations... annotations) {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800471 super(providerId, id, mac, vlan, location, ips, annotations);
472 this.timestamp = checkNotNull(timestamp);
Madan Jampaniab994042014-10-13 15:34:04 -0700473 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700474
475 public Timestamp timestamp() {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800476 return timestamp;
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700477 }
Madan Jampaniab994042014-10-13 15:34:04 -0700478 }
Madan Jampani312a2982014-10-14 21:07:16 -0700479
480 private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800481 broadcastMessage(HOST_REMOVED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700482 }
483
484 private void notifyPeers(InternalHostEvent event) throws IOException {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800485 broadcastMessage(HOST_UPDATED_MSG, event);
Madan Jampani312a2982014-10-14 21:07:16 -0700486 }
487
488 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
489 ClusterMessage message = new ClusterMessage(
490 clusterService.getLocalNode().id(),
491 subject,
492 SERIALIZER.encode(event));
493 clusterCommunicator.broadcast(message);
494 }
Madan Jampani255618f2014-10-14 23:27:57 -0700495
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700496 private void unicastMessage(NodeId peer,
497 MessageSubject subject,
498 Object event) throws IOException {
499 ClusterMessage message = new ClusterMessage(
500 clusterService.getLocalNode().id(),
501 subject,
502 SERIALIZER.encode(event));
503 clusterCommunicator.unicast(message, peer);
504 }
505
Madan Jampani255618f2014-10-14 23:27:57 -0700506 private void notifyDelegateIfNotNull(HostEvent event) {
507 if (event != null) {
508 notifyDelegate(event);
509 }
510 }
511
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800512 private final class InternalHostEventListener
513 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700514 @Override
515 public void handle(ClusterMessage message) {
516
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800517 log.debug("Received host update event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800518 InternalHostEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700519
520 ProviderId providerId = event.providerId();
521 HostId hostId = event.hostId();
522 HostDescription hostDescription = event.hostDescription();
523 Timestamp timestamp = event.timestamp();
524
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800525 executor.submit(new Runnable() {
526
527 @Override
528 public void run() {
529 try {
530 notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
531 hostId,
532 hostDescription,
533 timestamp));
534 } catch (Exception e) {
535 log.warn("Exception thrown handling host removed", e);
536 }
537 }
538 });
Madan Jampani255618f2014-10-14 23:27:57 -0700539 }
540 }
541
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800542 private final class InternalHostRemovedEventListener
543 implements ClusterMessageHandler {
Madan Jampani255618f2014-10-14 23:27:57 -0700544 @Override
545 public void handle(ClusterMessage message) {
546
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800547 log.debug("Received host removed event from peer: {}", message.sender());
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800548 InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani255618f2014-10-14 23:27:57 -0700549
550 HostId hostId = event.hostId();
551 Timestamp timestamp = event.timestamp();
552
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800553 executor.submit(new Runnable() {
554
555 @Override
556 public void run() {
557 try {
558 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
559 } catch (Exception e) {
560 log.warn("Exception thrown handling host removed", e);
561 }
562 }
563 });
Madan Jampani255618f2014-10-14 23:27:57 -0700564 }
565 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700566
567 private final class SendAdvertisementTask implements Runnable {
568
569 @Override
570 public void run() {
571 if (Thread.currentThread().isInterrupted()) {
572 log.info("Interrupted, quitting");
573 return;
574 }
575
576 try {
577 final NodeId self = clusterService.getLocalNode().id();
578 Set<ControllerNode> nodes = clusterService.getNodes();
579
580 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
581 .transform(toNodeId())
582 .toList();
583
584 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800585 log.trace("No other peers in the cluster.");
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700586 return;
587 }
588
589 NodeId peer;
590 do {
591 int idx = RandomUtils.nextInt(0, nodeIds.size());
592 peer = nodeIds.get(idx);
593 } while (peer.equals(self));
594
595 HostAntiEntropyAdvertisement ad = createAdvertisement();
596
597 if (Thread.currentThread().isInterrupted()) {
598 log.info("Interrupted, quitting");
599 return;
600 }
601
602 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800603 unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700604 } catch (IOException e) {
Yuta HIGUCHI78f3a0a2014-10-16 17:24:20 -0700605 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700606 return;
607 }
608 } catch (Exception e) {
609 // catch all Exception to avoid Scheduled task being suppressed.
610 log.error("Exception thrown while sending advertisement", e);
611 }
612 }
613 }
614
615 private HostAntiEntropyAdvertisement createAdvertisement() {
616 final NodeId self = clusterService.getLocalNode().id();
617
618 Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
619 Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
620
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800621 hosts.forEach((hostId, hostInfo) -> {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700622 final ProviderId providerId = hostInfo.providerId();
623 timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800624 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700625
Yuta HIGUCHI4423acc2014-11-25 12:34:44 -0800626 removedHosts.forEach((hostId, timestamped) -> {
627 tombstones.put(hostId, timestamped.timestamp());
628 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700629
630 return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
631 }
632
633 private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
634
635 final NodeId sender = ad.sender();
636
637 for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
638 // for each locally live Hosts...
639 final HostId hostId = host.getKey();
640 final StoredHost localHost = host.getValue();
641 final ProviderId providerId = localHost.providerId();
642 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
643 final Timestamp localLiveTimestamp = localHost.timestamp();
644
645 Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
646 if (remoteTimestamp == null) {
647 remoteTimestamp = ad.tombstones().get(hostId);
648 }
649 if (remoteTimestamp == null ||
650 localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
651
652 // local is more recent, push
653 // TODO: annotation is lost
654 final HostDescription desc = new DefaultHostDescription(
655 localHost.mac(),
656 localHost.vlan(),
657 localHost.location(),
658 localHost.ipAddresses());
659 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800660 unicastMessage(sender, HOST_UPDATED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700661 new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
662 } catch (IOException e1) {
663 log.debug("Failed to send advertisement response", e1);
664 }
665 }
666
667 final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
668 if (remoteDeadTimestamp != null &&
669 remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
670 // sender has recent remove
671 notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
672 }
673 }
674
675 for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
676 // for each locally dead Hosts
677 final HostId hostId = dead.getKey();
678 final Timestamp localDeadTimestamp = dead.getValue().timestamp();
679
680 // TODO: pick proper ProviderId, when supporting multi-provider
681 final ProviderId providerId = dead.getValue().value().providerId();
682 final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
683
684 final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
685 if (remoteLiveTimestamp != null &&
686 localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
687 // sender has zombie, push
688 try {
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800689 unicastMessage(sender, HOST_REMOVED_MSG,
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700690 new InternalHostRemovedEvent(hostId, localDeadTimestamp));
691 } catch (IOException e1) {
692 log.debug("Failed to send advertisement response", e1);
693 }
694 }
695 }
696
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700697 for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
698 // for each remote tombstone advertisement...
699 final HostId hostId = e.getKey();
700 final Timestamp adRemoveTimestamp = e.getValue();
701
702 final StoredHost storedHost = hosts.get(hostId);
703 if (storedHost == null) {
704 continue;
705 }
706 if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
707 // sender has recent remove info, locally remove
708 notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
709 }
710 }
Yuta HIGUCHIe59c7e32014-12-08 22:07:31 -0800711
712 // if remote ad has something unknown, actively sync
713 for (HostFragmentId key : ad.timestamps().keySet()) {
714 if (!hosts.containsKey(key.hostId())) {
715 HostAntiEntropyAdvertisement myAd = createAdvertisement();
716 try {
717 unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
718 break;
719 } catch (IOException e) {
720 log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
721 }
722 }
723 }
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700724 }
725
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800726 private final class InternalHostAntiEntropyAdvertisementListener
727 implements ClusterMessageHandler {
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700728
729 @Override
730 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800731 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700732 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800733 backgroundExecutor.submit(new Runnable() {
734
735 @Override
736 public void run() {
737 try {
738 handleAntiEntropyAdvertisement(advertisement);
739 } catch (Exception e) {
740 log.warn("Exception thrown handling Host advertisements", e);
741 }
742 }
743 });
Yuta HIGUCHI5fa3dc02014-10-15 17:08:13 -0700744 }
745 }
Madan Jampaniab994042014-10-13 15:34:04 -0700746}