blob: 5f5184fdae34e74a2d7eda2d40ae908348d67476 [file] [log] [blame]
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -07001package org.onlab.onos.store.link.impl;
2
3import static com.google.common.cache.CacheBuilder.newBuilder;
4import static org.onlab.onos.net.Link.Type.DIRECT;
5import static org.onlab.onos.net.Link.Type.INDIRECT;
6import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
7import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
8import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
9import static org.slf4j.LoggerFactory.getLogger;
10
11import java.util.HashSet;
12import java.util.Set;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070013
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070014import org.apache.felix.scr.annotations.Activate;
15import org.apache.felix.scr.annotations.Component;
16import org.apache.felix.scr.annotations.Deactivate;
17import org.apache.felix.scr.annotations.Service;
18import org.onlab.onos.net.ConnectPoint;
19import org.onlab.onos.net.DefaultLink;
20import org.onlab.onos.net.DeviceId;
21import org.onlab.onos.net.Link;
22import org.onlab.onos.net.LinkKey;
23import org.onlab.onos.net.link.LinkDescription;
24import org.onlab.onos.net.link.LinkEvent;
25import org.onlab.onos.net.link.LinkStore;
26import org.onlab.onos.net.link.LinkStoreDelegate;
27import org.onlab.onos.net.provider.ProviderId;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070028import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
29import org.onlab.onos.store.common.AbstractHazelcastStore;
30import org.onlab.onos.store.common.OptionalCacheLoader;
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070031import org.slf4j.Logger;
32
33import com.google.common.base.Optional;
34import com.google.common.cache.LoadingCache;
35import com.google.common.collect.HashMultimap;
36import com.google.common.collect.ImmutableSet;
37import com.google.common.collect.Multimap;
38import com.google.common.collect.ImmutableSet.Builder;
39import com.hazelcast.core.IMap;
40
Yuta HIGUCHIc99a8d32014-10-02 17:16:34 -070041//TODO: Add support for multiple provider and annotations
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070042/**
43 * Manages inventory of infrastructure links using Hazelcast-backed map.
44 */
45@Component(immediate = true)
46@Service
47public class DistributedLinkStore
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070048 extends AbstractHazelcastStore<LinkEvent, LinkStoreDelegate>
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070049 implements LinkStore {
50
51 private final Logger log = getLogger(getClass());
52
53 // Link inventory
54 private IMap<byte[], byte[]> rawLinks;
55 private LoadingCache<LinkKey, Optional<DefaultLink>> links;
56
57 // TODO synchronize?
58 // Egress and ingress link sets
59 private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
60 private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
61
Yuta HIGUCHI63ab3ea2014-09-28 22:08:41 -070062 private String linksListener;
63
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070064 @Override
65 @Activate
66 public void activate() {
67 super.activate();
68
69 boolean includeValue = true;
70
71 // TODO decide on Map name scheme to avoid collision
72 rawLinks = theInstance.getMap("links");
73 final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070074 = new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070075 links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
76 // refresh/populate cache based on notification from other instance
Yuta HIGUCHI63ab3ea2014-09-28 22:08:41 -070077 linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070078
79 loadLinkCache();
80
81 log.info("Started");
82 }
83
84 @Deactivate
85 public void deactivate() {
Yuta HIGUCHI63ab3ea2014-09-28 22:08:41 -070086 rawLinks.removeEntryListener(linksListener);
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -070087 log.info("Stopped");
88 }
89
90 private void loadLinkCache() {
91 for (byte[] keyBytes : rawLinks.keySet()) {
92 final LinkKey id = deserialize(keyBytes);
93 links.refresh(id);
94 }
95 }
96
97 @Override
98 public int getLinkCount() {
99 return links.asMap().size();
100 }
101
102 @Override
103 public Iterable<Link> getLinks() {
104 Builder<Link> builder = ImmutableSet.builder();
105 for (Optional<DefaultLink> e : links.asMap().values()) {
106 if (e.isPresent()) {
107 builder.add(e.get());
108 }
109 }
110 return builder.build();
111 }
112
113 @Override
114 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
115 return ImmutableSet.copyOf(srcLinks.get(deviceId));
116 }
117
118 @Override
119 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
120 return ImmutableSet.copyOf(dstLinks.get(deviceId));
121 }
122
123 @Override
124 public Link getLink(ConnectPoint src, ConnectPoint dst) {
125 return links.getUnchecked(new LinkKey(src, dst)).orNull();
126 }
127
128 @Override
129 public Set<Link> getEgressLinks(ConnectPoint src) {
130 Set<Link> egress = new HashSet<>();
131 for (Link link : srcLinks.get(src.deviceId())) {
132 if (link.src().equals(src)) {
133 egress.add(link);
134 }
135 }
136 return egress;
137 }
138
139 @Override
140 public Set<Link> getIngressLinks(ConnectPoint dst) {
141 Set<Link> ingress = new HashSet<>();
142 for (Link link : dstLinks.get(dst.deviceId())) {
143 if (link.dst().equals(dst)) {
144 ingress.add(link);
145 }
146 }
147 return ingress;
148 }
149
150 @Override
151 public LinkEvent createOrUpdateLink(ProviderId providerId,
152 LinkDescription linkDescription) {
153 LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
154 Optional<DefaultLink> link = links.getUnchecked(key);
155 if (!link.isPresent()) {
156 return createLink(providerId, key, linkDescription);
157 }
158 return updateLink(providerId, link.get(), key, linkDescription);
159 }
160
161 // Creates and stores the link and returns the appropriate event.
162 private LinkEvent createLink(ProviderId providerId, LinkKey key,
163 LinkDescription linkDescription) {
164 DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
165 linkDescription.type());
166 synchronized (this) {
167 final byte[] keyBytes = serialize(key);
168 rawLinks.put(keyBytes, serialize(link));
169 links.asMap().putIfAbsent(key, Optional.of(link));
170
171 addNewLink(link);
172 }
173 return new LinkEvent(LINK_ADDED, link);
174 }
175
176 // update Egress and ingress link sets
177 private void addNewLink(DefaultLink link) {
178 synchronized (this) {
179 srcLinks.put(link.src().deviceId(), link);
180 dstLinks.put(link.dst().deviceId(), link);
181 }
182 }
183
184 // Updates, if necessary the specified link and returns the appropriate event.
185 private LinkEvent updateLink(ProviderId providerId, DefaultLink link,
186 LinkKey key, LinkDescription linkDescription) {
187 // FIXME confirm Link update condition is OK
188 if (link.type() == INDIRECT && linkDescription.type() == DIRECT) {
189 synchronized (this) {
190
191 DefaultLink updated =
192 new DefaultLink(providerId, link.src(), link.dst(),
193 linkDescription.type());
194 final byte[] keyBytes = serialize(key);
195 rawLinks.put(keyBytes, serialize(updated));
196 links.asMap().replace(key, Optional.of(link), Optional.of(updated));
197
198 replaceLink(link, updated);
199 return new LinkEvent(LINK_UPDATED, updated);
200 }
201 }
202 return null;
203 }
204
205 // update Egress and ingress link sets
206 private void replaceLink(DefaultLink link, DefaultLink updated) {
207 synchronized (this) {
208 srcLinks.remove(link.src().deviceId(), link);
209 dstLinks.remove(link.dst().deviceId(), link);
210
211 srcLinks.put(link.src().deviceId(), updated);
212 dstLinks.put(link.dst().deviceId(), updated);
213 }
214 }
215
216 @Override
217 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
218 synchronized (this) {
219 LinkKey key = new LinkKey(src, dst);
220 byte[] keyBytes = serialize(key);
221 Link link = deserialize(rawLinks.remove(keyBytes));
222 links.invalidate(key);
223 if (link != null) {
224 removeLink(link);
225 return new LinkEvent(LINK_REMOVED, link);
226 }
227 return null;
228 }
229 }
230
231 // update Egress and ingress link sets
232 private void removeLink(Link link) {
233 synchronized (this) {
234 srcLinks.remove(link.src().deviceId(), link);
235 dstLinks.remove(link.dst().deviceId(), link);
236 }
237 }
238
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -0700239 private class RemoteLinkEventHandler extends RemoteCacheEventHandler<LinkKey, DefaultLink> {
Yuta HIGUCHIa8a53eb2014-09-25 17:47:55 -0700240 public RemoteLinkEventHandler(LoadingCache<LinkKey, Optional<DefaultLink>> cache) {
241 super(cache);
242 }
243
244 @Override
245 protected void onAdd(LinkKey key, DefaultLink newVal) {
246 addNewLink(newVal);
247 notifyDelegate(new LinkEvent(LINK_ADDED, newVal));
248 }
249
250 @Override
251 protected void onUpdate(LinkKey key, DefaultLink oldVal, DefaultLink newVal) {
252 replaceLink(oldVal, newVal);
253 notifyDelegate(new LinkEvent(LINK_UPDATED, newVal));
254 }
255
256 @Override
257 protected void onRemove(LinkKey key, DefaultLink val) {
258 removeLink(val);
259 notifyDelegate(new LinkEvent(LINK_REMOVED, val));
260 }
261 }
262}