blob: c5b6bbd9b80e9459470741eefe3b13762ac38536 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.link.impl;
Madan Jampani2ff05592014-10-10 15:42:47 -070017
Ray Milkeyb7f0f642016-01-22 16:08:14 -080018import java.io.IOException;
19import java.util.Collections;
20import java.util.HashMap;
21import java.util.HashSet;
22import java.util.Map;
23import java.util.Map.Entry;
24import java.util.Set;
25import java.util.concurrent.ConcurrentHashMap;
26import java.util.concurrent.ConcurrentMap;
27import java.util.concurrent.ExecutorService;
28import java.util.concurrent.Executors;
29import java.util.concurrent.ScheduledExecutorService;
30import java.util.concurrent.TimeUnit;
31
Madan Jampania97e8202014-10-10 17:01:33 -070032import org.apache.commons.lang3.RandomUtils;
Madan Jampani2ff05592014-10-10 15:42:47 -070033import org.apache.felix.scr.annotations.Activate;
Madan Jampani2ff05592014-10-10 15:42:47 -070034import org.apache.felix.scr.annotations.Deactivate;
35import org.apache.felix.scr.annotations.Reference;
36import org.apache.felix.scr.annotations.ReferenceCardinality;
37import org.apache.felix.scr.annotations.Service;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080038import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.cluster.ClusterService;
40import org.onosproject.cluster.ControllerNode;
41import org.onosproject.cluster.NodeId;
Marc De Leenheerb473b9d2015-02-06 15:21:03 -080042import org.onosproject.mastership.MastershipService;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.net.AnnotationsUtil;
44import org.onosproject.net.ConnectPoint;
45import org.onosproject.net.DefaultAnnotations;
46import org.onosproject.net.DefaultLink;
47import org.onosproject.net.DeviceId;
48import org.onosproject.net.Link;
49import org.onosproject.net.Link.Type;
50import org.onosproject.net.LinkKey;
51import org.onosproject.net.SparseAnnotations;
52import org.onosproject.net.device.DeviceClockService;
53import org.onosproject.net.link.DefaultLinkDescription;
54import org.onosproject.net.link.LinkDescription;
55import org.onosproject.net.link.LinkEvent;
56import org.onosproject.net.link.LinkStore;
57import org.onosproject.net.link.LinkStoreDelegate;
58import org.onosproject.net.provider.ProviderId;
59import org.onosproject.store.AbstractStore;
60import org.onosproject.store.Timestamp;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64import org.onosproject.store.cluster.messaging.MessageSubject;
65import org.onosproject.store.impl.Timestamped;
HIGUCHI Yutae7290652016-05-18 11:29:01 -070066import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070067import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Madan Jampani2ff05592014-10-10 15:42:47 -070068import org.slf4j.Logger;
69
Ray Milkeyb7f0f642016-01-22 16:08:14 -080070import com.google.common.base.Function;
71import com.google.common.collect.FluentIterable;
72import com.google.common.collect.ImmutableList;
73import com.google.common.collect.Multimaps;
74import com.google.common.collect.SetMultimap;
75import com.google.common.collect.Sets;
Madan Jampani2ff05592014-10-10 15:42:47 -070076
sangyun-hanf7fe7632016-02-16 14:47:44 +090077import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuska57126fe2014-11-11 17:13:24 -080078import static com.google.common.base.Preconditions.checkNotNull;
79import static com.google.common.base.Predicates.notNull;
80import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
Madan Jampania97e8202014-10-10 17:01:33 -070081import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080082import static org.onlab.util.Tools.groupedThreads;
Ray Milkey7bbeb3f2014-12-11 14:59:26 -080083import static org.onlab.util.Tools.minPriority;
Brian O'Connorabafb502014-12-02 22:26:20 -080084import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
85import static org.onosproject.net.DefaultAnnotations.merge;
86import static org.onosproject.net.DefaultAnnotations.union;
87import static org.onosproject.net.Link.State.ACTIVE;
88import static org.onosproject.net.Link.State.INACTIVE;
89import static org.onosproject.net.Link.Type.DIRECT;
90import static org.onosproject.net.Link.Type.INDIRECT;
91import static org.onosproject.net.LinkKey.linkKey;
Jian Li11599162016-01-15 15:46:16 -080092import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
93import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
94import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
Brian O'Connorabafb502014-12-02 22:26:20 -080095import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
Madan Jampani2ff05592014-10-10 15:42:47 -070096import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani2ff05592014-10-10 15:42:47 -070097
98/**
99 * Manages inventory of infrastructure links in distributed data store
100 * that uses optimistic replication and gossip based techniques.
101 */
Jian Li11599162016-01-15 15:46:16 -0800102//@Component(immediate = true, enabled = false)
Madan Jampani2ff05592014-10-10 15:42:47 -0700103@Service
104public class GossipLinkStore
105 extends AbstractStore<LinkEvent, LinkStoreDelegate>
106 implements LinkStore {
107
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800108 // Timeout in milliseconds to process links on remote master node
109 private static final int REMOTE_MASTER_TIMEOUT = 1000;
110
sangyun-hanf7fe7632016-02-16 14:47:44 +0900111 // Default delay for ScheduledExecutorService of anti-entropy(BackgroundExecutor)
112 private static final long DEFAULT_INITIAL_DELAY = 5;
113
114 // Default period for ScheduledExecutorService of anti-entropy(BackgroundExecutor)
115 private static final long DEFAULT_PERIOD = 5;
116
117 private static long initialDelaySec = DEFAULT_INITIAL_DELAY;
118 private static long periodSec = DEFAULT_PERIOD;
119
Madan Jampani2ff05592014-10-10 15:42:47 -0700120 private final Logger log = getLogger(getClass());
121
122 // Link inventory
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700123 private final ConcurrentMap<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>> linkDescs =
Madan Jampani2ff05592014-10-10 15:42:47 -0700124 new ConcurrentHashMap<>();
125
126 // Link instance cache
127 private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
128
129 // Egress and ingress link sets
130 private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
131 private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
132
133 // Remove links
Yuta HIGUCHIb9125562014-12-01 23:28:22 -0800134 private final Map<LinkKey, Timestamp> removedLinks = new ConcurrentHashMap<>();
Madan Jampani2ff05592014-10-10 15:42:47 -0700135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI093e83e2014-10-10 22:26:11 -0700137 protected DeviceClockService deviceClockService;
Madan Jampani2ff05592014-10-10 15:42:47 -0700138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected ClusterCommunicationService clusterCommunicator;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected ClusterService clusterService;
144
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected MastershipService mastershipService;
147
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700148 protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
149 KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800150 .register(DistributedStoreSerializers.STORE_COMMON)
151 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani2ff05592014-10-10 15:42:47 -0700152 .register(InternalLinkEvent.class)
153 .register(InternalLinkRemovedEvent.class)
Madan Jampanid2054d42014-10-10 17:27:06 -0700154 .register(LinkAntiEntropyAdvertisement.class)
155 .register(LinkFragmentId.class)
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800156 .register(LinkInjectedEvent.class)
HIGUCHI Yutae7290652016-05-18 11:29:01 -0700157 .build("GossipLink"));
Madan Jampani2ff05592014-10-10 15:42:47 -0700158
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800159 private ExecutorService executor;
160
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800161 private ScheduledExecutorService backgroundExecutors;
Madan Jampania97e8202014-10-10 17:01:33 -0700162
Madan Jampani2ff05592014-10-10 15:42:47 -0700163 @Activate
164 public void activate() {
165
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800166 executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800167
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800168 backgroundExecutors =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -0800169 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
Madan Jampania97e8202014-10-10 17:01:33 -0700170
Madan Jampani2af244a2015-02-22 13:12:01 -0800171 clusterCommunicator.addSubscriber(
172 GossipLinkStoreMessageSubjects.LINK_UPDATE,
173 new InternalLinkEventListener(), executor);
174 clusterCommunicator.addSubscriber(
175 GossipLinkStoreMessageSubjects.LINK_REMOVED,
176 new InternalLinkRemovedEventListener(), executor);
177 clusterCommunicator.addSubscriber(
178 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
179 new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
180 clusterCommunicator.addSubscriber(
181 GossipLinkStoreMessageSubjects.LINK_INJECTED,
182 new LinkInjectedEventListener(), executor);
183
Madan Jampania97e8202014-10-10 17:01:33 -0700184 // start anti-entropy thread
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800185 backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
Madan Jampania97e8202014-10-10 17:01:33 -0700186 initialDelaySec, periodSec, TimeUnit.SECONDS);
Madan Jampani2ff05592014-10-10 15:42:47 -0700187
188 log.info("Started");
189 }
190
191 @Deactivate
192 public void deactivate() {
Madan Jampani3ffbb272014-10-13 11:19:37 -0700193
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800194 executor.shutdownNow();
195
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800196 backgroundExecutors.shutdownNow();
Madan Jampani3ffbb272014-10-13 11:19:37 -0700197 try {
Yuta HIGUCHI06586272014-11-25 14:27:03 -0800198 if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
Madan Jampani3ffbb272014-10-13 11:19:37 -0700199 log.error("Timeout during executor shutdown");
200 }
201 } catch (InterruptedException e) {
202 log.error("Error during executor shutdown", e);
203 }
204
Madan Jampani2ff05592014-10-10 15:42:47 -0700205 linkDescs.clear();
206 links.clear();
207 srcLinks.clear();
208 dstLinks.clear();
209 log.info("Stopped");
210 }
211
212 @Override
213 public int getLinkCount() {
214 return links.size();
215 }
216
217 @Override
218 public Iterable<Link> getLinks() {
219 return Collections.unmodifiableCollection(links.values());
220 }
221
222 @Override
223 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
224 // lock for iteration
225 synchronized (srcLinks) {
226 return FluentIterable.from(srcLinks.get(deviceId))
227 .transform(lookupLink())
228 .filter(notNull())
229 .toSet();
230 }
231 }
232
233 @Override
234 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
235 // lock for iteration
236 synchronized (dstLinks) {
237 return FluentIterable.from(dstLinks.get(deviceId))
238 .transform(lookupLink())
239 .filter(notNull())
240 .toSet();
241 }
242 }
243
244 @Override
245 public Link getLink(ConnectPoint src, ConnectPoint dst) {
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -0700246 return links.get(linkKey(src, dst));
Madan Jampani2ff05592014-10-10 15:42:47 -0700247 }
248
249 @Override
250 public Set<Link> getEgressLinks(ConnectPoint src) {
251 Set<Link> egress = new HashSet<>();
HIGUCHI Yuta4d973eb2015-02-26 14:28:49 -0800252 //
253 // Change `srcLinks` to ConcurrentMap<DeviceId, (Concurrent)Set>
254 // to remove this synchronized block, if we hit performance issue.
255 // SetMultiMap#get returns wrapped collection to provide modifiable-view.
256 // And the wrapped collection is not concurrent access safe.
257 //
258 // Our use case here does not require returned collection to be modifiable,
259 // so the wrapped collection forces us to lock the whole multiset,
260 // for benefit we don't need.
261 //
262 // Same applies to `dstLinks`
263 synchronized (srcLinks) {
264 for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
265 if (linkKey.src().equals(src)) {
266 Link link = links.get(linkKey);
267 if (link != null) {
268 egress.add(link);
269 } else {
270 log.debug("Egress link for {} was null, skipped", linkKey);
271 }
Ray Milkey7bbeb3f2014-12-11 14:59:26 -0800272 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700273 }
274 }
275 return egress;
276 }
277
278 @Override
279 public Set<Link> getIngressLinks(ConnectPoint dst) {
280 Set<Link> ingress = new HashSet<>();
HIGUCHI Yuta4d973eb2015-02-26 14:28:49 -0800281 synchronized (dstLinks) {
282 for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
283 if (linkKey.dst().equals(dst)) {
284 Link link = links.get(linkKey);
285 if (link != null) {
286 ingress.add(link);
287 } else {
288 log.debug("Ingress link for {} was null, skipped", linkKey);
289 }
Ray Milkey7bbeb3f2014-12-11 14:59:26 -0800290 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700291 }
292 }
293 return ingress;
294 }
295
296 @Override
297 public LinkEvent createOrUpdateLink(ProviderId providerId,
298 LinkDescription linkDescription) {
299
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800300 final DeviceId dstDeviceId = linkDescription.dst().deviceId();
301 final NodeId localNode = clusterService.getLocalNode().id();
302 final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700303
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800304 // Process link update only if we're the master of the destination node,
305 // otherwise signal the actual master.
306 LinkEvent linkEvent = null;
307 if (localNode.equals(dstNode)) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700308
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800309 Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
310
311 final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
312
313 LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
314 final Timestamped<LinkDescription> mergedDesc;
315 Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
316
317 synchronized (map) {
318 linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
319 mergedDesc = map.get(providerId);
320 }
321
322 if (linkEvent != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700323 log.debug("Notifying peers of a link update topology event from providerId: "
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800324 + "{} between src: {} and dst: {}",
325 providerId, linkDescription.src(), linkDescription.dst());
326 notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
327 }
328
329 } else {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800330 // Only forward for ConfigProvider
331 // Forwarding was added as a workaround for ONOS-490
HIGUCHI Yuta4ea4e422016-01-13 16:40:34 -0800332 if (!providerId.scheme().equals("cfg")) {
HIGUCHI Yuta1979f552015-12-28 21:24:26 -0800333 return null;
334 }
HIGUCHI Yutadc2e7c22015-02-24 12:19:47 -0800335 // FIXME Temporary hack for NPE (ONOS-1171).
336 // Proper fix is to implement forwarding to master on ConfigProvider
337 // redo ONOS-490
338 if (dstNode == null) {
339 // silently ignore
340 return null;
341 }
342
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800343
344 LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800345
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800346 // TODO check unicast return value
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700347 clusterCommunicator.unicast(linkInjectedEvent,
348 GossipLinkStoreMessageSubjects.LINK_INJECTED,
349 SERIALIZER::encode,
350 dstNode);
Yuta HIGUCHI92cd51e2014-10-13 10:51:45 -0700351 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700352
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800353 return linkEvent;
Madan Jampani2ff05592014-10-10 15:42:47 -0700354 }
355
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800356 @Override
357 public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
358 Link link = getLink(src, dst);
359 if (link == null) {
360 return null;
361 }
362
363 if (link.isDurable()) {
364 // FIXME: this is not the right thing to call for the gossip store; will not sync link state!!!
365 return link.state() == INACTIVE ? null :
366 updateLink(linkKey(link.src(), link.dst()), link,
Ray Milkey2693bda2016-01-22 16:08:14 -0800367 DefaultLink.builder()
368 .providerId(link.providerId())
369 .src(link.src())
370 .dst(link.dst())
371 .type(link.type())
372 .state(INACTIVE)
373 .isExpected(link.isExpected())
374 .annotations(link.annotations())
375 .build());
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800376 }
377 return removeLink(src, dst);
378 }
379
Madan Jampani2ff05592014-10-10 15:42:47 -0700380 private LinkEvent createOrUpdateLinkInternal(
381 ProviderId providerId,
382 Timestamped<LinkDescription> linkDescription) {
383
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700384 final LinkKey key = linkKey(linkDescription.value().src(),
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800385 linkDescription.value().dst());
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700386 Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
Madan Jampani2ff05592014-10-10 15:42:47 -0700387
388 synchronized (descs) {
389 // if the link was previously removed, we should proceed if and
390 // only if this request is more recent.
391 Timestamp linkRemovedTimestamp = removedLinks.get(key);
392 if (linkRemovedTimestamp != null) {
Jonathan Hart403ea932015-02-20 16:23:00 -0800393 if (linkDescription.isNewerThan(linkRemovedTimestamp)) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700394 removedLinks.remove(key);
395 } else {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700396 log.trace("Link {} was already removed ignoring.", key);
Madan Jampani2ff05592014-10-10 15:42:47 -0700397 return null;
398 }
399 }
400
401 final Link oldLink = links.get(key);
402 // update description
403 createOrUpdateLinkDescription(descs, providerId, linkDescription);
404 final Link newLink = composeLink(descs);
405 if (oldLink == null) {
406 return createLink(key, newLink);
407 }
408 return updateLink(key, oldLink, newLink);
409 }
410 }
411
412 // Guarded by linkDescs value (=locking each Link)
413 private Timestamped<LinkDescription> createOrUpdateLinkDescription(
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700414 Map<ProviderId, Timestamped<LinkDescription>> descs,
Madan Jampani2ff05592014-10-10 15:42:47 -0700415 ProviderId providerId,
416 Timestamped<LinkDescription> linkDescription) {
417
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700418 // merge existing annotations
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700419 Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700420 if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700421 log.trace("local info is more up-to-date, ignoring {}.", linkDescription);
Madan Jampani2ff05592014-10-10 15:42:47 -0700422 return null;
423 }
424 Timestamped<LinkDescription> newLinkDescription = linkDescription;
425 if (existingLinkDescription != null) {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700426 // we only allow transition from INDIRECT -> DIRECT
427 final Type newType;
428 if (existingLinkDescription.value().type() == DIRECT) {
429 newType = DIRECT;
430 } else {
431 newType = linkDescription.value().type();
432 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700433 SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
434 linkDescription.value().annotations());
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800435 newLinkDescription = new Timestamped<>(
Madan Jampani2ff05592014-10-10 15:42:47 -0700436 new DefaultLinkDescription(
437 linkDescription.value().src(),
438 linkDescription.value().dst(),
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800439 newType,
440 existingLinkDescription.value().isExpected(),
441 merged),
Madan Jampani2ff05592014-10-10 15:42:47 -0700442 linkDescription.timestamp());
443 }
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700444 return descs.put(providerId, newLinkDescription);
Madan Jampani2ff05592014-10-10 15:42:47 -0700445 }
446
447 // Creates and stores the link and returns the appropriate event.
448 // Guarded by linkDescs value (=locking each Link)
449 private LinkEvent createLink(LinkKey key, Link newLink) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700450 links.put(key, newLink);
451 srcLinks.put(newLink.src().deviceId(), key);
452 dstLinks.put(newLink.dst().deviceId(), key);
453 return new LinkEvent(LINK_ADDED, newLink);
454 }
455
456 // Updates, if necessary the specified link and returns the appropriate event.
457 // Guarded by linkDescs value (=locking each Link)
458 private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
Yuta HIGUCHIa85542b2014-10-21 19:29:49 -0700459 // Note: INDIRECT -> DIRECT transition only
460 // so that BDDP discovered Link will not overwrite LDDP Link
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800461 if (oldLink.state() != newLink.state() ||
462 (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
Madan Jampani2ff05592014-10-10 15:42:47 -0700463 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
464
465 links.put(key, newLink);
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800466 // strictly speaking following can be omitted
Madan Jampani2ff05592014-10-10 15:42:47 -0700467 srcLinks.put(oldLink.src().deviceId(), key);
468 dstLinks.put(oldLink.dst().deviceId(), key);
469 return new LinkEvent(LINK_UPDATED, newLink);
470 }
471 return null;
472 }
473
474 @Override
475 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
Yuta HIGUCHI18ab8a92014-10-13 11:16:19 -0700476 final LinkKey key = linkKey(src, dst);
Madan Jampani2ff05592014-10-10 15:42:47 -0700477
478 DeviceId dstDeviceId = dst.deviceId();
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700479 Timestamp timestamp = null;
480 try {
481 timestamp = deviceClockService.getTimestamp(dstDeviceId);
482 } catch (IllegalStateException e) {
Thomas Vachuska6f90c592015-06-03 20:06:58 -0700483 log.debug("Failed to remove link {}, was not the master", key);
484 // there are times when this is called before mastership
Ayaka Koshibeb5c63a02014-10-18 18:42:27 -0700485 // handoff correctly completes.
486 return null;
487 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700488
489 LinkEvent event = removeLinkInternal(key, timestamp);
490
491 if (event != null) {
Madan Jampanif2af7712015-05-29 18:43:52 -0700492 log.debug("Notifying peers of a link removed topology event for a link "
Madan Jampani2ff05592014-10-10 15:42:47 -0700493 + "between src: {} and dst: {}", src, dst);
Jonathan Hart7d656f42015-01-27 14:07:23 -0800494 notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
Madan Jampani2ff05592014-10-10 15:42:47 -0700495 }
496 return event;
497 }
498
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700499 private static Timestamped<LinkDescription> getPrimaryDescription(
500 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
501
Madan Jampani2ff05592014-10-10 15:42:47 -0700502 synchronized (linkDescriptions) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700503 for (Entry<ProviderId, Timestamped<LinkDescription>>
504 e : linkDescriptions.entrySet()) {
505
506 if (!e.getKey().isAncillary()) {
507 return e.getValue();
508 }
509 }
510 }
511 return null;
512 }
513
514
515 // TODO: consider slicing out as Timestamp utils
516 /**
517 * Checks is timestamp is more recent than timestamped object.
518 *
519 * @param timestamp to check if this is more recent then other
520 * @param timestamped object to be tested against
521 * @return true if {@code timestamp} is more recent than {@code timestamped}
522 * or {@code timestamped is null}
523 */
524 private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
525 checkNotNull(timestamp);
526 if (timestamped == null) {
527 return true;
528 }
529 return timestamp.compareTo(timestamped.timestamp()) > 0;
530 }
531
532 private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
533 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
534 = getOrCreateLinkDescriptions(key);
535
536 synchronized (linkDescriptions) {
537 if (linkDescriptions.isEmpty()) {
538 // never seen such link before. keeping timestamp for record
539 removedLinks.put(key, timestamp);
540 return null;
541 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700542 // accept removal request if given timestamp is newer than
543 // the latest Timestamp from Primary provider
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700544 Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
545 if (!isMoreRecent(timestamp, prim)) {
546 // outdated remove request, ignore
Madan Jampani2ff05592014-10-10 15:42:47 -0700547 return null;
548 }
549 removedLinks.put(key, timestamp);
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800550 Link link = links.remove(key);
Madan Jampani2ff05592014-10-10 15:42:47 -0700551 linkDescriptions.clear();
552 if (link != null) {
553 srcLinks.remove(link.src().deviceId(), key);
554 dstLinks.remove(link.dst().deviceId(), key);
555 return new LinkEvent(LINK_REMOVED, link);
556 }
557 return null;
558 }
559 }
560
Yuta HIGUCHI800fac62014-12-11 19:23:01 -0800561 /**
562 * Creates concurrent readable, synchronized HashMultimap.
563 *
564 * @return SetMultimap
565 */
Madan Jampani2ff05592014-10-10 15:42:47 -0700566 private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
Yuta HIGUCHI800fac62014-12-11 19:23:01 -0800567 return synchronizedSetMultimap(
Sho SHIMIZU7a4087b2015-09-10 09:23:16 -0700568 Multimaps.newSetMultimap(new ConcurrentHashMap<>(),
Yuta HIGUCHI800fac62014-12-11 19:23:01 -0800569 () -> Sets.newConcurrentHashSet()));
Madan Jampani2ff05592014-10-10 15:42:47 -0700570 }
571
572 /**
573 * @return primary ProviderID, or randomly chosen one if none exists
574 */
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700575 private static ProviderId pickBaseProviderId(
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700576 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700577
578 ProviderId fallBackPrimary = null;
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700579 for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700580 if (!e.getKey().isAncillary()) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700581 // found primary
Madan Jampani2ff05592014-10-10 15:42:47 -0700582 return e.getKey();
583 } else if (fallBackPrimary == null) {
584 // pick randomly as a fallback in case there is no primary
585 fallBackPrimary = e.getKey();
586 }
587 }
588 return fallBackPrimary;
589 }
590
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700591 // Guarded by linkDescs value (=locking each Link)
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700592 private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700593 ProviderId baseProviderId = pickBaseProviderId(descs);
594 Timestamped<LinkDescription> base = descs.get(baseProviderId);
Madan Jampani2ff05592014-10-10 15:42:47 -0700595
596 ConnectPoint src = base.value().src();
597 ConnectPoint dst = base.value().dst();
598 Type type = base.value().type();
599 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
600 annotations = merge(annotations, base.value().annotations());
601
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700602 for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700603 if (baseProviderId.equals(e.getKey())) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700604 continue;
605 }
606
Yuta HIGUCHI65934892014-12-04 17:47:44 -0800607 // Note: In the long run we should keep track of Description timestamp
Madan Jampani2ff05592014-10-10 15:42:47 -0700608 // and only merge conflicting keys when timestamp is newer
609 // Currently assuming there will never be a key conflict between
610 // providers
611
612 // annotation merging. not so efficient, should revisit later
613 annotations = merge(annotations, e.getValue().value().annotations());
614 }
615
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800616 //boolean isDurable = Objects.equals(annotations.value(AnnotationKeys.DURABLE), "true");
617
618 // TEMP
619 Link.State initialLinkState = base.value().isExpected() ? ACTIVE : INACTIVE;
Ray Milkey2693bda2016-01-22 16:08:14 -0800620 return DefaultLink.builder()
621 .providerId(baseProviderId)
622 .src(src)
623 .dst(dst)
624 .type(type)
Ray Milkeyb7f0f642016-01-22 16:08:14 -0800625 .state(initialLinkState)
626 .isExpected(base.value().isExpected())
Ray Milkey2693bda2016-01-22 16:08:14 -0800627 .annotations(annotations)
628 .build();
Madan Jampani2ff05592014-10-10 15:42:47 -0700629 }
630
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700631 private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
Yuta HIGUCHI3e1a5bf2014-10-14 19:39:58 -0700632 Map<ProviderId, Timestamped<LinkDescription>> r;
633 r = linkDescs.get(key);
634 if (r != null) {
635 return r;
636 }
637 r = new HashMap<>();
638 final Map<ProviderId, Timestamped<LinkDescription>> concurrentlyAdded;
639 concurrentlyAdded = linkDescs.putIfAbsent(key, r);
640 if (concurrentlyAdded != null) {
641 return concurrentlyAdded;
642 } else {
643 return r;
644 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700645 }
646
647 private final Function<LinkKey, Link> lookupLink = new LookupLink();
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800648
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700649 /**
650 * Returns a Function to lookup Link instance using LinkKey from cache.
Thomas Vachuska57126fe2014-11-11 17:13:24 -0800651 *
652 * @return lookup link function
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700653 */
Madan Jampani2ff05592014-10-10 15:42:47 -0700654 private Function<LinkKey, Link> lookupLink() {
655 return lookupLink;
656 }
657
658 private final class LookupLink implements Function<LinkKey, Link> {
659 @Override
660 public Link apply(LinkKey input) {
Yuta HIGUCHI023295a2014-10-15 23:29:46 -0700661 if (input == null) {
662 return null;
663 } else {
664 return links.get(input);
665 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700666 }
667 }
668
Madan Jampani2ff05592014-10-10 15:42:47 -0700669 private void notifyDelegateIfNotNull(LinkEvent event) {
670 if (event != null) {
671 notifyDelegate(event);
672 }
673 }
674
Jonathan Hart7d656f42015-01-27 14:07:23 -0800675 private void broadcastMessage(MessageSubject subject, Object event) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700676 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
Madan Jampani2ff05592014-10-10 15:42:47 -0700677 }
678
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700679 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700680 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
Madan Jampania97e8202014-10-10 17:01:33 -0700681 }
682
Jonathan Hart7d656f42015-01-27 14:07:23 -0800683 private void notifyPeers(InternalLinkEvent event) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700684 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
685 }
686
Jonathan Hart7d656f42015-01-27 14:07:23 -0800687 private void notifyPeers(InternalLinkRemovedEvent event) {
Madan Jampani2ff05592014-10-10 15:42:47 -0700688 broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
689 }
690
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700691 // notify peer, silently ignoring error
Madan Jampania97e8202014-10-10 17:01:33 -0700692 private void notifyPeer(NodeId peer, InternalLinkEvent event) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700693 try {
694 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
695 } catch (IOException e) {
696 log.debug("Failed to notify peer {} with message {}", peer, event);
697 }
Madan Jampania97e8202014-10-10 17:01:33 -0700698 }
699
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700700 // notify peer, silently ignoring error
Madan Jampania97e8202014-10-10 17:01:33 -0700701 private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700702 try {
703 unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
704 } catch (IOException e) {
705 log.debug("Failed to notify peer {} with message {}", peer, event);
706 }
Madan Jampania97e8202014-10-10 17:01:33 -0700707 }
708
sangyun-hanf7fe7632016-02-16 14:47:44 +0900709 /**
710 * sets the time to delay first execution for anti-entropy.
711 * (scheduleAtFixedRate of ScheduledExecutorService)
712 *
713 * @param delay the time to delay first execution for anti-entropy
714 */
715 private void setInitialDelaySec(long delay) {
716 checkArgument(delay >= 0, "Initial delay of scheduleAtFixedRate() must be 0 or more");
717 initialDelaySec = delay;
718 }
719
720 /**
721 * sets the period between successive execution for anti-entropy.
722 * (scheduleAtFixedRate of ScheduledExecutorService)
723 *
724 * @param period the period between successive execution for anti-entropy
725 */
726 private void setPeriodSec(long period) {
727 checkArgument(period > 0, "Period of scheduleAtFixedRate() must be greater than 0");
728 periodSec = period;
729 }
730
Madan Jampania97e8202014-10-10 17:01:33 -0700731 private final class SendAdvertisementTask implements Runnable {
732
733 @Override
734 public void run() {
735 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800736 log.debug("Interrupted, quitting");
Madan Jampania97e8202014-10-10 17:01:33 -0700737 return;
738 }
739
740 try {
741 final NodeId self = clusterService.getLocalNode().id();
742 Set<ControllerNode> nodes = clusterService.getNodes();
743
744 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
745 .transform(toNodeId())
746 .toList();
747
748 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800749 log.trace("No other peers in the cluster.");
Madan Jampania97e8202014-10-10 17:01:33 -0700750 return;
751 }
752
753 NodeId peer;
754 do {
755 int idx = RandomUtils.nextInt(0, nodeIds.size());
756 peer = nodeIds.get(idx);
757 } while (peer.equals(self));
758
759 LinkAntiEntropyAdvertisement ad = createAdvertisement();
760
761 if (Thread.currentThread().isInterrupted()) {
Yuta HIGUCHI1a012722014-11-20 15:21:41 -0800762 log.debug("Interrupted, quitting");
Madan Jampania97e8202014-10-10 17:01:33 -0700763 return;
764 }
765
766 try {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700767 unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
768 } catch (IOException e) {
769 log.debug("Failed to send anti-entropy advertisement to {}", peer);
Madan Jampania97e8202014-10-10 17:01:33 -0700770 return;
771 }
772 } catch (Exception e) {
773 // catch all Exception to avoid Scheduled task being suppressed.
774 log.error("Exception thrown while sending advertisement", e);
775 }
776 }
777 }
778
779 private LinkAntiEntropyAdvertisement createAdvertisement() {
780 final NodeId self = clusterService.getLocalNode().id();
781
782 Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
783 Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
784
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -0800785 linkDescs.forEach((linkKey, linkDesc) -> {
Madan Jampania97e8202014-10-10 17:01:33 -0700786 synchronized (linkDesc) {
787 for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
788 linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
789 }
790 }
Yuta HIGUCHIb6cfac32014-11-25 13:37:27 -0800791 });
Madan Jampania97e8202014-10-10 17:01:33 -0700792
793 linkTombstones.putAll(removedLinks);
794
795 return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
796 }
797
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700798 private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
Madan Jampania97e8202014-10-10 17:01:33 -0700799
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700800 final NodeId sender = ad.sender();
801 boolean localOutdated = false;
Madan Jampania97e8202014-10-10 17:01:33 -0700802
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700803 for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
804 l : linkDescs.entrySet()) {
Madan Jampania97e8202014-10-10 17:01:33 -0700805
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700806 final LinkKey key = l.getKey();
807 final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
808 synchronized (link) {
809 Timestamp localLatest = removedLinks.get(key);
Madan Jampania97e8202014-10-10 17:01:33 -0700810
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700811 for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
812 final ProviderId providerId = p.getKey();
813 final Timestamped<LinkDescription> pDesc = p.getValue();
Madan Jampania97e8202014-10-10 17:01:33 -0700814
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700815 final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
816 // remote
817 Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
818 if (remoteTimestamp == null) {
819 remoteTimestamp = ad.linkTombstones().get(key);
820 }
821 if (remoteTimestamp == null ||
Jonathan Hart403ea932015-02-20 16:23:00 -0800822 pDesc.isNewerThan(remoteTimestamp)) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700823 // I have more recent link description. update peer.
824 notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
825 } else {
826 final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
827 if (remoteLive != null &&
828 remoteLive.compareTo(pDesc.timestamp()) > 0) {
829 // I have something outdated
830 localOutdated = true;
831 }
832 }
833
834 // search local latest along the way
835 if (localLatest == null ||
Jonathan Hart403ea932015-02-20 16:23:00 -0800836 pDesc.isNewerThan(localLatest)) {
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700837 localLatest = pDesc.timestamp();
838 }
839 }
840 // Tests if remote remove is more recent then local latest.
841 final Timestamp remoteRemove = ad.linkTombstones().get(key);
842 if (remoteRemove != null) {
843 if (localLatest != null &&
844 localLatest.compareTo(remoteRemove) < 0) {
845 // remote remove is more recent
846 notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
847 }
848 }
Madan Jampania97e8202014-10-10 17:01:33 -0700849 }
850 }
851
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700852 // populate remove info if not known locally
853 for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
854 final LinkKey key = remoteRm.getKey();
855 final Timestamp remoteRemove = remoteRm.getValue();
856 // relying on removeLinkInternal to ignore stale info
857 notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
858 }
Madan Jampania97e8202014-10-10 17:01:33 -0700859
Yuta HIGUCHI2fe46a22014-10-15 22:51:02 -0700860 if (localOutdated) {
861 // send back advertisement to speed up convergence
862 try {
863 unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
864 createAdvertisement());
865 } catch (IOException e) {
866 log.debug("Failed to send back active advertisement");
Madan Jampania97e8202014-10-10 17:01:33 -0700867 }
868 }
869 }
870
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800871 private final class InternalLinkEventListener
872 implements ClusterMessageHandler {
Madan Jampani2ff05592014-10-10 15:42:47 -0700873 @Override
874 public void handle(ClusterMessage message) {
875
Yuta HIGUCHIc01d2aa2014-10-19 01:19:34 -0700876 log.trace("Received link event from peer: {}", message.sender());
Sho SHIMIZU5eb79c52015-09-29 14:32:49 -0700877 InternalLinkEvent event = SERIALIZER.decode(message.payload());
Madan Jampani2ff05592014-10-10 15:42:47 -0700878
879 ProviderId providerId = event.providerId();
880 Timestamped<LinkDescription> linkDescription = event.linkDescription();
881
Madan Jampani2af244a2015-02-22 13:12:01 -0800882 try {
883 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
884 } catch (Exception e) {
885 log.warn("Exception thrown handling link event", e);
886 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700887 }
888 }
889
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800890 private final class InternalLinkRemovedEventListener
891 implements ClusterMessageHandler {
Madan Jampani2ff05592014-10-10 15:42:47 -0700892 @Override
893 public void handle(ClusterMessage message) {
894
Yuta HIGUCHIc01d2aa2014-10-19 01:19:34 -0700895 log.trace("Received link removed event from peer: {}", message.sender());
Sho SHIMIZU5eb79c52015-09-29 14:32:49 -0700896 InternalLinkRemovedEvent event = SERIALIZER.decode(message.payload());
Madan Jampani2ff05592014-10-10 15:42:47 -0700897
898 LinkKey linkKey = event.linkKey();
899 Timestamp timestamp = event.timestamp();
900
Madan Jampani2af244a2015-02-22 13:12:01 -0800901 try {
902 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
903 } catch (Exception e) {
904 log.warn("Exception thrown handling link removed", e);
905 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700906 }
907 }
Madan Jampania97e8202014-10-10 17:01:33 -0700908
Yuta HIGUCHI80d56592014-11-25 15:11:13 -0800909 private final class InternalLinkAntiEntropyAdvertisementListener
910 implements ClusterMessageHandler {
Madan Jampania97e8202014-10-10 17:01:33 -0700911
912 @Override
913 public void handle(ClusterMessage message) {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800914 log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
Madan Jampania97e8202014-10-10 17:01:33 -0700915 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
Madan Jampani2af244a2015-02-22 13:12:01 -0800916 try {
917 handleAntiEntropyAdvertisement(advertisement);
918 } catch (Exception e) {
919 log.warn("Exception thrown while handling Link advertisements", e);
920 throw e;
921 }
Madan Jampania97e8202014-10-10 17:01:33 -0700922 }
923 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800924
925 private final class LinkInjectedEventListener
926 implements ClusterMessageHandler {
927 @Override
928 public void handle(ClusterMessage message) {
929
930 log.trace("Received injected link event from peer: {}", message.sender());
931 LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
932
933 ProviderId providerId = linkInjectedEvent.providerId();
934 LinkDescription linkDescription = linkInjectedEvent.linkDescription();
935
HIGUCHI Yuta0b4d2982015-02-27 23:18:52 -0800936 final DeviceId deviceId = linkDescription.dst().deviceId();
937 if (!deviceClockService.isTimestampAvailable(deviceId)) {
938 // workaround for ONOS-1208
939 log.warn("Not ready to accept update. Dropping {}", linkDescription);
940 return;
941 }
942
Madan Jampani2af244a2015-02-22 13:12:01 -0800943 try {
944 createOrUpdateLink(providerId, linkDescription);
945 } catch (Exception e) {
946 log.warn("Exception thrown while handling link injected event", e);
947 }
Marc De Leenheerb473b9d2015-02-06 15:21:03 -0800948 }
949 }
Madan Jampani2ff05592014-10-10 15:42:47 -0700950}