blob: 2b6c1c8e888781616ef728e036734599d1ffc290 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.link.impl;
Madan Jampani2ff05592014-10-10 15:42:47 -070017
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080018import java.io.IOException;
Yuta HIGUCHI800fac62014-12-11 19:23:01 -080019import java.util.Collection;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080020import java.util.Collections;
21import java.util.HashMap;
22import java.util.HashSet;
23import java.util.Map;
24import java.util.Map.Entry;
25import java.util.Objects;
26import java.util.Set;
27import java.util.concurrent.ConcurrentHashMap;
28import java.util.concurrent.ConcurrentMap;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.TimeUnit;
Yuta HIGUCHI06586272014-11-25 14:27:03 -080033
Madan Jampania97e8202014-10-10 17:01:33 -070034import org.apache.commons.lang3.RandomUtils;
Madan Jampani2ff05592014-10-10 15:42:47 -070035import org.apache.felix.scr.annotations.Activate;
36import org.apache.felix.scr.annotations.Component;
37import org.apache.felix.scr.annotations.Deactivate;
38import org.apache.felix.scr.annotations.Reference;
39import org.apache.felix.scr.annotations.ReferenceCardinality;
40import org.apache.felix.scr.annotations.Service;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080041import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.cluster.ClusterService;
43import org.onosproject.cluster.ControllerNode;
44import org.onosproject.cluster.NodeId;
45import org.onosproject.net.AnnotationKeys;
46import org.onosproject.net.AnnotationsUtil;
47import org.onosproject.net.ConnectPoint;
48import org.onosproject.net.DefaultAnnotations;
49import org.onosproject.net.DefaultLink;
50import org.onosproject.net.DeviceId;
51import org.onosproject.net.Link;
52import org.onosproject.net.Link.Type;
53import org.onosproject.net.LinkKey;
54import org.onosproject.net.SparseAnnotations;
55import org.onosproject.net.device.DeviceClockService;
56import org.onosproject.net.link.DefaultLinkDescription;
57import org.onosproject.net.link.LinkDescription;
58import org.onosproject.net.link.LinkEvent;
59import org.onosproject.net.link.LinkStore;
60import org.onosproject.net.link.LinkStoreDelegate;
61import org.onosproject.net.provider.ProviderId;
62import org.onosproject.store.AbstractStore;
63import org.onosproject.store.Timestamp;
64import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
65import org.onosproject.store.cluster.messaging.ClusterMessage;
66import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
67import org.onosproject.store.cluster.messaging.MessageSubject;
68import org.onosproject.store.impl.Timestamped;
69import org.onosproject.store.serializers.KryoSerializer;
70import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
Madan Jampani2ff05592014-10-10 15:42:47 -070071import org.slf4j.Logger;
72
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080073import com.google.common.base.Function;
74import com.google.common.collect.FluentIterable;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080075import com.google.common.collect.ImmutableList;
Yuta HIGUCHI800fac62014-12-11 19:23:01 -080076import com.google.common.collect.Multimaps;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080077import com.google.common.collect.SetMultimap;
Yuta HIGUCHI800fac62014-12-11 19:23:01 -080078import com.google.common.collect.Sets;
Madan Jampani2ff05592014-10-10 15:42:47 -070079
Thomas Vachuska57126fe2014-11-11 17:13:24 -080080import static com.google.common.base.Preconditions.checkNotNull;
81import static com.google.common.base.Predicates.notNull;
82import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
Madan Jampania97e8202014-10-10 17:01:33 -070083import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080084import static org.onlab.util.Tools.minPriority;
85import static org.onlab.util.Tools.namedThreads;
Brian O'Connorabafb502014-12-02 22:26:20 -080086import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
87import static org.onosproject.net.DefaultAnnotations.merge;
88import static org.onosproject.net.DefaultAnnotations.union;
89import static org.onosproject.net.Link.State.ACTIVE;
90import static org.onosproject.net.Link.State.INACTIVE;
91import static org.onosproject.net.Link.Type.DIRECT;
92import static org.onosproject.net.Link.Type.INDIRECT;
93import static org.onosproject.net.LinkKey.linkKey;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080094import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
95import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
96import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
Brian O'Connorabafb502014-12-02 22:26:20 -080097import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
Madan Jampani2ff05592014-10-10 15:42:47 -070098import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani2ff05592014-10-10 15:42:47 -070099
100/**
101 * Manages inventory of infrastructure links in distributed data store
102 * that uses optimistic replication and gossip based techniques.
103 */
104@Component(immediate = true)
105@Service
106public class GossipLinkStore
107 extends AbstractStore<LinkEvent, LinkStoreDelegate>
108 implements LinkStore {
109
110 private final Logger log = getLogger(getClass());
111
112 // Link inventory
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700113 private final ConcurrentMap<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>> linkDescs =
Madan Jampani2ff05592014-10-10 15:42:47 -0700114 new ConcurrentHashMap<>();
115
116 // Link instance cache
117 private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
118
119 // Egress and ingress link sets
120 private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
121 private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
122
123 // Remove links
Yuta HIGUCHIb9125562014-12-01 23:28:22 -0800124 private final Map<LinkKey, Timestamp> removedLinks = new ConcurrentHashMap<>();
Madan Jampani2ff05592014-10-10 15:42:47 -0700125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700127 protected DeviceClockService deviceClockService;
Madan Jampani2ff05592014-10-10 15:42:47 -0700128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected ClusterCommunicationService clusterCommunicator;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected ClusterService clusterService;
134
Yuta HIGUCHI3e5d11a2014-11-04 14:16:44 -0800135 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani2ff05592014-10-10 15:42:47 -0700136 @Override
137 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700138 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800139 .register(DistributedStoreSerializers.STORE_COMMON)
140 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani2ff05592014-10-10 15:42:47 -0700141 .register(InternalLinkEvent.class)
142 .register(InternalLinkRemovedEvent.class)
Madan Jampanid2054d42014-10-10 17:27:06 -0700143 .register(LinkAntiEntropyAdvertisement.class)
144 .register(LinkFragmentId.class)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800145 .build();
Madan Jampani2ff05592014-10-10 15:42:47 -0700146 }
147 };
148
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800149 private ExecutorService executor;
150
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800151 private ScheduledExecutorService backgroundExecutors;
Madan Jampania97e8202014-10-10 17:01:33 -0700152
Madan Jampani2ff05592014-10-10 15:42:47 -0700153 @Activate
154 public void activate() {
155
156 clusterCommunicator.addSubscriber(
Madan Jampania97e8202014-10-10 17:01:33 -0700157 GossipLinkStoreMessageSubjects.LINK_UPDATE,
158 new InternalLinkEventListener());
Madan Jampani2ff05592014-10-10 15:42:47 -0700159 clusterCommunicator.addSubscriber(
Madan Jampania97e8202014-10-10 17:01:33 -0700160 GossipLinkStoreMessageSubjects.LINK_REMOVED,
161 new InternalLinkRemovedEventListener());
162 clusterCommunicator.addSubscriber(
163 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
164 new InternalLinkAntiEntropyAdvertisementListener());
165
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800166 executor = Executors.newCachedThreadPool(namedThreads("link-fg-%d"));
167
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800168 backgroundExecutors =
169 newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
Madan Jampania97e8202014-10-10 17:01:33 -0700170
Madan Jampania97e8202014-10-10 17:01:33 -0700171 long initialDelaySec = 5;
172 long periodSec = 5;
173 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800174 backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
Madan Jampania97e8202014-10-10 17:01:33 -0700175 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani2ff05592014-10-10 15:42:47 -0700176
177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Madan Jampani3ffbb272014-10-13 11:19:37 -0700182
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800183 executor.shutdownNow();
184
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800185 backgroundExecutors.shutdownNow();
Madan Jampani3ffbb272014-10-13 11:19:37 -0700186 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800187 if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
Madan Jampani3ffbb272014-10-13 11:19:37 -0700188 log.error("Timeout during executor shutdown");
189 }
190 } catch (InterruptedException e) {
191 log.error("Error during executor shutdown", e);
192 }
193
Madan Jampani2ff05592014-10-10 15:42:47 -0700194 linkDescs.clear();
195 links.clear();
196 srcLinks.clear();
197 dstLinks.clear();
198 log.info("Stopped");
199 }
200
201 @Override
202 public int getLinkCount() {
203 return links.size();
204 }
205
206 @Override
207 public Iterable<Link> getLinks() {
208 return Collections.unmodifiableCollection(links.values());
209 }
210
211 @Override
212 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
213 // lock for iteration
214 synchronized (srcLinks) {
215 return FluentIterable.from(srcLinks.get(deviceId))
216 .transform(lookupLink())
217 .filter(notNull())
218 .toSet();
219 }
220 }
221
222 @Override
223 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
224 // lock for iteration
225 synchronized (dstLinks) {
226 return FluentIterable.from(dstLinks.get(deviceId))
227 .transform(lookupLink())
228 .filter(notNull())
229 .toSet();
230 }
231 }
232
233 @Override
234 public Link getLink(ConnectPoint src, ConnectPoint dst) {
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -0700235 return links.get(linkKey(src, dst));
Madan Jampani2ff05592014-10-10 15:42:47 -0700236 }
237
238 @Override
239 public Set<Link> getEgressLinks(ConnectPoint src) {
240 Set<Link> egress = new HashSet<>();
241 for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
242 if (linkKey.src().equals(src)) {
Ray Milkey7bbeb3f2014-12-11 14:59:26 -0800243 Link link = links.get(linkKey);
244 if (link != null) {
245 egress.add(link);
246 } else {
247 log.debug("Egress link for {} was null, skipped", linkKey);
248 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700249 }
250 }
251 return egress;
252 }
253
254 @Override
255 public Set<Link> getIngressLinks(ConnectPoint dst) {
256 Set<Link> ingress = new HashSet<>();
257 for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
258 if (linkKey.dst().equals(dst)) {
Ray Milkey7bbeb3f2014-12-11 14:59:26 -0800259 Link link = links.get(linkKey);
260 if (link != null) {
261 ingress.add(link);
262 } else {
263 log.debug("Ingress link for {} was null, skipped", linkKey);
264 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700265 }
266 }
267 return ingress;
268 }
269
270 @Override
271 public LinkEvent createOrUpdateLink(ProviderId providerId,
272 LinkDescription linkDescription) {
273
274 DeviceId dstDeviceId = linkDescription.dst().deviceId();
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700275 Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700276
277 final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
278
Yuta HIGUCHI990aecc2014-10-13 22:21:42 -0700279 LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700280 final LinkEvent event;
281 final Timestamped<LinkDescription> mergedDesc;
alshabibdfc7afb2014-10-21 20:13:27 -0700282 Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
283 synchronized (map) {
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700284 event = createOrUpdateLinkInternal(providerId, deltaDesc);
alshabibdfc7afb2014-10-21 20:13:27 -0700285 mergedDesc = map.get(providerId);
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700286 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700287
288 if (event != null) {
289 log.info("Notifying peers of a link update topology event from providerId: "
290 + "{} between src: {} and dst: {}",
291 providerId, linkDescription.src(), linkDescription.dst());
292 try {
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700293 notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
Madan Jampani2ff05592014-10-10 15:42:47 -0700294 } catch (IOException e) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800295 log.debug("Failed to notify peers of a link update topology event from providerId: "
alshabibdfc7afb2014-10-21 20:13:27 -0700296 + "{} between src: {} and dst: {}",
297 providerId, linkDescription.src(), linkDescription.dst());
Madan Jampani2ff05592014-10-10 15:42:47 -0700298 }
299 }
300 return event;
301 }
302
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800303 @Override
304 public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
305 Link link = getLink(src, dst);
306 if (link == null) {
307 return null;
308 }
309
310 if (link.isDurable()) {
311 // FIXME: this is not the right thing to call for the gossip store; will not sync link state!!!
312 return link.state() == INACTIVE ? null :
313 updateLink(linkKey(link.src(), link.dst()), link,
314 new DefaultLink(link.providerId(),
315 link.src(), link.dst(),
316 link.type(), INACTIVE,
317 link.isDurable(),
318 link.annotations()));
319 }
320 return removeLink(src, dst);
321 }
322
Madan Jampani2ff05592014-10-10 15:42:47 -0700323 private LinkEvent createOrUpdateLinkInternal(
324 ProviderId providerId,
325 Timestamped<LinkDescription> linkDescription) {
326
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700327 final LinkKey key = linkKey(linkDescription.value().src(),
328 linkDescription.value().dst());
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700329 Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
Madan Jampani2ff05592014-10-10 15:42:47 -0700330
331 synchronized (descs) {
332 // if the link was previously removed, we should proceed if and
333 // only if this request is more recent.
334 Timestamp linkRemovedTimestamp = removedLinks.get(key);
335 if (linkRemovedTimestamp != null) {
336 if (linkDescription.isNewer(linkRemovedTimestamp)) {
337 removedLinks.remove(key);
338 } else {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700339 log.trace("Link {} was already removed ignoring.", key);
Madan Jampani2ff05592014-10-10 15:42:47 -0700340 return null;
341 }
342 }
343
344 final Link oldLink = links.get(key);
345 // update description
346 createOrUpdateLinkDescription(descs, providerId, linkDescription);
347 final Link newLink = composeLink(descs);
348 if (oldLink == null) {
349 return createLink(key, newLink);
350 }
351 return updateLink(key, oldLink, newLink);
352 }
353 }
354
355 // Guarded by linkDescs value (=locking each Link)
356 private Timestamped<LinkDescription> createOrUpdateLinkDescription(
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700357 Map<ProviderId, Timestamped<LinkDescription>> descs,
Madan Jampani2ff05592014-10-10 15:42:47 -0700358 ProviderId providerId,
359 Timestamped<LinkDescription> linkDescription) {
360
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700361 // merge existing annotations
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700362 Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700363 if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700364 log.trace("local info is more up-to-date, ignoring {}.", linkDescription);
Madan Jampani2ff05592014-10-10 15:42:47 -0700365 return null;
366 }
367 Timestamped<LinkDescription> newLinkDescription = linkDescription;
368 if (existingLinkDescription != null) {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700369 // we only allow transition from INDIRECT -> DIRECT
370 final Type newType;
371 if (existingLinkDescription.value().type() == DIRECT) {
372 newType = DIRECT;
373 } else {
374 newType = linkDescription.value().type();
375 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700376 SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
377 linkDescription.value().annotations());
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800378 newLinkDescription = new Timestamped<>(
Madan Jampani2ff05592014-10-10 15:42:47 -0700379 new DefaultLinkDescription(
380 linkDescription.value().src(),
381 linkDescription.value().dst(),
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700382 newType, merged),
Madan Jampani2ff05592014-10-10 15:42:47 -0700383 linkDescription.timestamp());
384 }
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700385 return descs.put(providerId, newLinkDescription);
Madan Jampani2ff05592014-10-10 15:42:47 -0700386 }
387
388 // Creates and stores the link and returns the appropriate event.
389 // Guarded by linkDescs value (=locking each Link)
390 private LinkEvent createLink(LinkKey key, Link newLink) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700391 links.put(key, newLink);
392 srcLinks.put(newLink.src().deviceId(), key);
393 dstLinks.put(newLink.dst().deviceId(), key);
394 return new LinkEvent(LINK_ADDED, newLink);
395 }
396
397 // Updates, if necessary the specified link and returns the appropriate event.
398 // Guarded by linkDescs value (=locking each Link)
399 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700400 // Note: INDIRECT -> DIRECT transition only
401 // so that BDDP discovered Link will not overwrite LDDP Link
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800402 if (oldLink.state() != newLink.state() ||
403 (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
Madan Jampani2ff05592014-10-10 15:42:47 -0700404 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
405
406 links.put(key, newLink);
407 // strictly speaking following can be ommitted
408 srcLinks.put(oldLink.src().deviceId(), key);
409 dstLinks.put(oldLink.dst().deviceId(), key);
410 return new LinkEvent(LINK_UPDATED, newLink);
411 }
412 return null;
413 }
414
415 @Override
416 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -0700417 final LinkKey key = linkKey(src, dst);
Madan Jampani2ff05592014-10-10 15:42:47 -0700418
419 DeviceId dstDeviceId = dst.deviceId();
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700420 Timestamp timestamp = null;
421 try {
422 timestamp = deviceClockService.getTimestamp(dstDeviceId);
423 } catch (IllegalStateException e) {
Yuta HIGUCHId6a0ac32014-11-04 09:26:31 -0800424 log.warn("Failed to remove link {}, was not the master", key);
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700425 //there are times when this is called before mastership
426 // handoff correctly completes.
427 return null;
428 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700429
430 LinkEvent event = removeLinkInternal(key, timestamp);
431
432 if (event != null) {
433 log.info("Notifying peers of a link removed topology event for a link "
434 + "between src: {} and dst: {}", src, dst);
435 try {
436 notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
437 } catch (IOException e) {
438 log.error("Failed to notify peers of a link removed topology event for a link "
439 + "between src: {} and dst: {}", src, dst);
440 }
441 }
442 return event;
443 }
444
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700445 private static Timestamped<LinkDescription> getPrimaryDescription(
446 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
447
Madan Jampani2ff05592014-10-10 15:42:47 -0700448 synchronized (linkDescriptions) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700449 for (Entry<ProviderId, Timestamped<LinkDescription>>
450 e : linkDescriptions.entrySet()) {
451
452 if (!e.getKey().isAncillary()) {
453 return e.getValue();
454 }
455 }
456 }
457 return null;
458 }
459
460
461 // TODO: consider slicing out as Timestamp utils
462 /**
463 * Checks is timestamp is more recent than timestamped object.
464 *
465 * @param timestamp to check if this is more recent then other
466 * @param timestamped object to be tested against
467 * @return true if {@code timestamp} is more recent than {@code timestamped}
468 * or {@code timestamped is null}
469 */
470 private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
471 checkNotNull(timestamp);
472 if (timestamped == null) {
473 return true;
474 }
475 return timestamp.compareTo(timestamped.timestamp()) > 0;
476 }
477
478 private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
479 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
480 = getOrCreateLinkDescriptions(key);
481
482 synchronized (linkDescriptions) {
483 if (linkDescriptions.isEmpty()) {
484 // never seen such link before. keeping timestamp for record
485 removedLinks.put(key, timestamp);
486 return null;
487 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700488 // accept removal request if given timestamp is newer than
489 // the latest Timestamp from Primary provider
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700490 Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
491 if (!isMoreRecent(timestamp, prim)) {
492 // outdated remove request, ignore
Madan Jampani2ff05592014-10-10 15:42:47 -0700493 return null;
494 }
495 removedLinks.put(key, timestamp);
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800496 Link link = links.remove(key);
Madan Jampani2ff05592014-10-10 15:42:47 -0700497 linkDescriptions.clear();
498 if (link != null) {
499 srcLinks.remove(link.src().deviceId(), key);
500 dstLinks.remove(link.dst().deviceId(), key);
501 return new LinkEvent(LINK_REMOVED, link);
502 }
503 return null;
504 }
505 }
506
Yuta HIGUCHI800fac62014-12-11 19:23:01 -0800507 /**
508 * Creates concurrent readable, synchronized HashMultimap.
509 *
510 * @return SetMultimap
511 */
Madan Jampani2ff05592014-10-10 15:42:47 -0700512 private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
Yuta HIGUCHI800fac62014-12-11 19:23:01 -0800513 return synchronizedSetMultimap(
514 Multimaps.newSetMultimap(new ConcurrentHashMap<K, Collection<V>>(),
515 () -> Sets.newConcurrentHashSet()));
Madan Jampani2ff05592014-10-10 15:42:47 -0700516 }
517
518 /**
519 * @return primary ProviderID, or randomly chosen one if none exists
520 */
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700521 private static ProviderId pickBaseProviderId(
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700522 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700523
524 ProviderId fallBackPrimary = null;
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700525 for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700526 if (!e.getKey().isAncillary()) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700527 // found primary
Madan Jampani2ff05592014-10-10 15:42:47 -0700528 return e.getKey();
529 } else if (fallBackPrimary == null) {
530 // pick randomly as a fallback in case there is no primary
531 fallBackPrimary = e.getKey();
532 }
533 }
534 return fallBackPrimary;
535 }
536
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700537 // Guarded by linkDescs value (=locking each Link)
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700538 private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700539 ProviderId baseProviderId = pickBaseProviderId(descs);
540 Timestamped<LinkDescription> base = descs.get(baseProviderId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700541
542 ConnectPoint src = base.value().src();
543 ConnectPoint dst = base.value().dst();
544 Type type = base.value().type();
545 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
546 annotations = merge(annotations, base.value().annotations());
547
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700548 for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700549 if (baseProviderId.equals(e.getKey())) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700550 continue;
551 }
552
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800553 // Note: In the long run we should keep track of Description timestamp
Madan Jampani2ff05592014-10-10 15:42:47 -0700554 // and only merge conflicting keys when timestamp is newer
555 // Currently assuming there will never be a key conflict between
556 // providers
557
558 // annotation merging. not so efficient, should revisit later
559 annotations = merge(annotations, e.getValue().value().annotations());
560 }
561
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800562 boolean isDurable = Objects.equals(annotations.value(AnnotationKeys.DURABLE), "true");
Thomas Vachuskabadb93f2014-11-15 23:51:17 -0800563 return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations);
Madan Jampani2ff05592014-10-10 15:42:47 -0700564 }
565
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700566 private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700567 Map<ProviderId, Timestamped<LinkDescription>> r;
568 r = linkDescs.get(key);
569 if (r != null) {
570 return r;
571 }
572 r = new HashMap<>();
573 final Map<ProviderId, Timestamped<LinkDescription>> concurrentlyAdded;
574 concurrentlyAdded = linkDescs.putIfAbsent(key, r);
575 if (concurrentlyAdded != null) {
576 return concurrentlyAdded;
577 } else {
578 return r;
579 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700580 }
581
582 private final Function<LinkKey, Link> lookupLink = new LookupLink();
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800583
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700584 /**
585 * Returns a Function to lookup Link instance using LinkKey from cache.
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800586 *
587 * @return lookup link function
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700588 */
Madan Jampani2ff05592014-10-10 15:42:47 -0700589 private Function<LinkKey, Link> lookupLink() {
590 return lookupLink;
591 }
592
593 private final class LookupLink implements Function<LinkKey, Link> {
594 @Override
595 public Link apply(LinkKey input) {
Yuta HIGUCHI023295a2014-10-15 23:29:46 -0700596 if (input == null) {
597 return null;
598 } else {
599 return links.get(input);
600 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700601 }
602 }
603
Madan Jampani2ff05592014-10-10 15:42:47 -0700604 private void notifyDelegateIfNotNull(LinkEvent event) {
605 if (event != null) {
606 notifyDelegate(event);
607 }
608 }
609
Madan Jampani2ff05592014-10-10 15:42:47 -0700610 private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
611 ClusterMessage message = new ClusterMessage(
612 clusterService.getLocalNode().id(),
613 subject,
614 SERIALIZER.encode(event));
615 clusterCommunicator.broadcast(message);
616 }
617
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700618 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
619 ClusterMessage message = new ClusterMessage(
620 clusterService.getLocalNode().id(),
621 subject,
622 SERIALIZER.encode(event));
623 clusterCommunicator.unicast(message, recipient);
Madan Jampania97e8202014-10-10 17:01:33 -0700624 }
625
Madan Jampani2ff05592014-10-10 15:42:47 -0700626 private void notifyPeers(InternalLinkEvent event) throws IOException {
627 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
628 }
629
630 private void notifyPeers(InternalLinkRemovedEvent event) throws IOException {
631 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
632 }
633
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700634 // notify peer, silently ignoring error
Madan Jampania97e8202014-10-10 17:01:33 -0700635 private void notifyPeer(NodeId peer, InternalLinkEvent event) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700636 try {
637 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
638 } catch (IOException e) {
639 log.debug("Failed to notify peer {} with message {}", peer, event);
640 }
Madan Jampania97e8202014-10-10 17:01:33 -0700641 }
642
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700643 // notify peer, silently ignoring error
Madan Jampania97e8202014-10-10 17:01:33 -0700644 private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700645 try {
646 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
647 } catch (IOException e) {
648 log.debug("Failed to notify peer {} with message {}", peer, event);
649 }
Madan Jampania97e8202014-10-10 17:01:33 -0700650 }
651
652 private final class SendAdvertisementTask implements Runnable {
653
654 @Override
655 public void run() {
656 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800657 log.debug("Interrupted, quitting");
Madan Jampania97e8202014-10-10 17:01:33 -0700658 return;
659 }
660
661 try {
662 final NodeId self = clusterService.getLocalNode().id();
663 Set<ControllerNode> nodes = clusterService.getNodes();
664
665 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
666 .transform(toNodeId())
667 .toList();
668
669 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800670 log.trace("No other peers in the cluster.");
Madan Jampania97e8202014-10-10 17:01:33 -0700671 return;
672 }
673
674 NodeId peer;
675 do {
676 int idx = RandomUtils.nextInt(0, nodeIds.size());
677 peer = nodeIds.get(idx);
678 } while (peer.equals(self));
679
680 LinkAntiEntropyAdvertisement ad = createAdvertisement();
681
682 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800683 log.debug("Interrupted, quitting");
Madan Jampania97e8202014-10-10 17:01:33 -0700684 return;
685 }
686
687 try {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700688 unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
689 } catch (IOException e) {
690 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Madan Jampania97e8202014-10-10 17:01:33 -0700691 return;
692 }
693 } catch (Exception e) {
694 // catch all Exception to avoid Scheduled task being suppressed.
695 log.error("Exception thrown while sending advertisement", e);
696 }
697 }
698 }
699
700 private LinkAntiEntropyAdvertisement createAdvertisement() {
701 final NodeId self = clusterService.getLocalNode().id();
702
703 Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
704 Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
705
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -0800706 linkDescs.forEach((linkKey, linkDesc) -> {
Madan Jampania97e8202014-10-10 17:01:33 -0700707 synchronized (linkDesc) {
708 for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
709 linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
710 }
711 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -0800712 });
Madan Jampania97e8202014-10-10 17:01:33 -0700713
714 linkTombstones.putAll(removedLinks);
715
716 return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
717 }
718
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700719 private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
Madan Jampania97e8202014-10-10 17:01:33 -0700720
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700721 final NodeId sender = ad.sender();
722 boolean localOutdated = false;
Madan Jampania97e8202014-10-10 17:01:33 -0700723
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700724 for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
725 l : linkDescs.entrySet()) {
Madan Jampania97e8202014-10-10 17:01:33 -0700726
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700727 final LinkKey key = l.getKey();
728 final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
729 synchronized (link) {
730 Timestamp localLatest = removedLinks.get(key);
Madan Jampania97e8202014-10-10 17:01:33 -0700731
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700732 for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
733 final ProviderId providerId = p.getKey();
734 final Timestamped<LinkDescription> pDesc = p.getValue();
Madan Jampania97e8202014-10-10 17:01:33 -0700735
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700736 final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
737 // remote
738 Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
739 if (remoteTimestamp == null) {
740 remoteTimestamp = ad.linkTombstones().get(key);
741 }
742 if (remoteTimestamp == null ||
743 pDesc.isNewer(remoteTimestamp)) {
744 // I have more recent link description. update peer.
745 notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
746 } else {
747 final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
748 if (remoteLive != null &&
749 remoteLive.compareTo(pDesc.timestamp()) > 0) {
750 // I have something outdated
751 localOutdated = true;
752 }
753 }
754
755 // search local latest along the way
756 if (localLatest == null ||
757 pDesc.isNewer(localLatest)) {
758 localLatest = pDesc.timestamp();
759 }
760 }
761 // Tests if remote remove is more recent then local latest.
762 final Timestamp remoteRemove = ad.linkTombstones().get(key);
763 if (remoteRemove != null) {
764 if (localLatest != null &&
765 localLatest.compareTo(remoteRemove) < 0) {
766 // remote remove is more recent
767 notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
768 }
769 }
Madan Jampania97e8202014-10-10 17:01:33 -0700770 }
771 }
772
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700773 // populate remove info if not known locally
774 for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
775 final LinkKey key = remoteRm.getKey();
776 final Timestamp remoteRemove = remoteRm.getValue();
777 // relying on removeLinkInternal to ignore stale info
778 notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
779 }
Madan Jampania97e8202014-10-10 17:01:33 -0700780
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700781 if (localOutdated) {
782 // send back advertisement to speed up convergence
783 try {
784 unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
785 createAdvertisement());
786 } catch (IOException e) {
787 log.debug("Failed to send back active advertisement");
Madan Jampania97e8202014-10-10 17:01:33 -0700788 }
789 }
790 }
791
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800792 private final class InternalLinkEventListener
793 implements ClusterMessageHandler {
Madan Jampani2ff05592014-10-10 15:42:47 -0700794 @Override
795 public void handle(ClusterMessage message) {
796
Yuta HIGUCHIc01d2aa2014-10-19 01:19:34 -0700797 log.trace("Received link event from peer: {}", message.sender());
Madan Jampani2ff05592014-10-10 15:42:47 -0700798 InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
799
800 ProviderId providerId = event.providerId();
801 Timestamped<LinkDescription> linkDescription = event.linkDescription();
802
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800803 executor.submit(new Runnable() {
804
805 @Override
806 public void run() {
807 try {
808 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
809 } catch (Exception e) {
810 log.warn("Exception thrown handling link event", e);
811 }
812 }
813 });
Madan Jampani2ff05592014-10-10 15:42:47 -0700814 }
815 }
816
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800817 private final class InternalLinkRemovedEventListener
818 implements ClusterMessageHandler {
Madan Jampani2ff05592014-10-10 15:42:47 -0700819 @Override
820 public void handle(ClusterMessage message) {
821
Yuta HIGUCHIc01d2aa2014-10-19 01:19:34 -0700822 log.trace("Received link removed event from peer: {}", message.sender());
Madan Jampani2ff05592014-10-10 15:42:47 -0700823 InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
824
825 LinkKey linkKey = event.linkKey();
826 Timestamp timestamp = event.timestamp();
827
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800828 executor.submit(new Runnable() {
829
830 @Override
831 public void run() {
832 try {
833 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
834 } catch (Exception e) {
835 log.warn("Exception thrown handling link removed", e);
836 }
837 }
838 });
Madan Jampani2ff05592014-10-10 15:42:47 -0700839 }
840 }
Madan Jampania97e8202014-10-10 17:01:33 -0700841
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800842 private final class InternalLinkAntiEntropyAdvertisementListener
843 implements ClusterMessageHandler {
Madan Jampania97e8202014-10-10 17:01:33 -0700844
845 @Override
846 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800847 log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
Madan Jampania97e8202014-10-10 17:01:33 -0700848 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800849 backgroundExecutors.submit(new Runnable() {
850
851 @Override
852 public void run() {
853 try {
854 handleAntiEntropyAdvertisement(advertisement);
855 } catch (Exception e) {
856 log.warn("Exception thrown while handling Link advertisements", e);
857 throw e;
858 }
859 }
860 });
Madan Jampania97e8202014-10-10 17:01:33 -0700861 }
862 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700863}