blob: 6082842d7b480ab7cd663206131cc458b874b0e2 [file] [log] [blame]
jccd8697232015-05-05 14:42:23 +08001package org.onosproject.store.tunnel.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.HashSet;
9import java.util.List;
10import java.util.Objects;
11import java.util.Set;
12
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
16import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
18import org.apache.felix.scr.annotations.Service;
19import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.core.IdGenerator;
24import org.onosproject.net.Annotations;
25import org.onosproject.net.provider.ProviderId;
26import org.onosproject.net.tunnel.DefaultTunnel;
27import org.onosproject.net.tunnel.Tunnel;
28import org.onosproject.net.tunnel.Tunnel.Type;
29import org.onosproject.net.tunnel.TunnelEndPoint;
30import org.onosproject.net.tunnel.TunnelEvent;
31import org.onosproject.net.tunnel.TunnelId;
32import org.onosproject.net.tunnel.TunnelName;
33import org.onosproject.net.tunnel.TunnelStore;
34import org.onosproject.net.tunnel.TunnelStoreDelegate;
35import org.onosproject.net.tunnel.TunnelSubscription;
36import org.onosproject.store.AbstractStore;
37import org.onosproject.store.app.GossipApplicationStore.InternalState;
38import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.EventuallyConsistentMap;
41import org.onosproject.store.service.MultiValuedTimestamp;
42import org.onosproject.store.service.StorageService;
43import org.onosproject.store.service.WallclockClockManager;
44import org.slf4j.Logger;
45
46import com.google.common.base.MoreObjects;
47import com.google.common.collect.ImmutableSet;
48
49/**
50 * Manages inventory of tunnel in distributed data store that uses optimistic
51 * replication and gossip based techniques.
52 */
53@Component(immediate = true)
54@Service
55public class DistributedTunnelStore
56 extends AbstractStore<TunnelEvent, TunnelStoreDelegate>
57 implements TunnelStore {
58
59 private final Logger log = getLogger(getClass());
60
61 /**
62 * The topic used for obtaining globally unique ids.
63 */
64 private String runnelOpTopoic = "tunnel-ops-ids";
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected ClusterCommunicationService clusterCommunicator;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected CoreService coreService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected StorageService storageService;
77
78 // tunnel identity as map key in the store.
79 private EventuallyConsistentMap<TunnelId, Tunnel> tunnelIdAsKeyStore;
80 // tunnel name as map key in the store.
81 private EventuallyConsistentMap<TunnelName, Set<TunnelId>> tunnelNameAsKeyStore;
82 // maintains all the tunnels between source and destination.
83 private EventuallyConsistentMap<TunnelKey, Set<TunnelId>> srcAndDstKeyStore;
84 // maintains all the tunnels by tunnel type.
85 private EventuallyConsistentMap<Tunnel.Type, Set<TunnelId>> typeKeyStore;
86 // maintains records that app subscribes tunnel.
87 private EventuallyConsistentMap<ApplicationId, Set<TunnelSubscription>> orderRelationship;
88
89 private IdGenerator idGenerator;
90
91 @Activate
92 public void activate() {
93 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API)
95 .register(MultiValuedTimestamp.class)
96 .register(InternalState.class);
97 tunnelIdAsKeyStore = storageService
98 .<TunnelId, Tunnel>eventuallyConsistentMapBuilder()
99 .withName("all_tunnel").withSerializer(serializer)
100 .withClockService(new WallclockClockManager<>()).build();
101 tunnelNameAsKeyStore = storageService
102 .<TunnelName, Set<TunnelId>>eventuallyConsistentMapBuilder()
103 .withName("tunnel_name_tunnel").withSerializer(serializer)
104 .withClockService(new WallclockClockManager<>()).build();
105 srcAndDstKeyStore = storageService
106 .<TunnelKey, Set<TunnelId>>eventuallyConsistentMapBuilder()
107 .withName("src_dst_tunnel").withSerializer(serializer)
108 .withClockService(new WallclockClockManager<>()).build();
109 typeKeyStore = storageService
110 .<Tunnel.Type, Set<TunnelId>>eventuallyConsistentMapBuilder()
111 .withName("type_tunnel").withSerializer(serializer)
112 .withClockService(new WallclockClockManager<>()).build();
113 idGenerator = coreService.getIdGenerator(runnelOpTopoic);
114 log.info("Started");
115 }
116
117 @Deactivate
118 public void deactivate() {
119 tunnelIdAsKeyStore.destroy();
120 srcAndDstKeyStore.destroy();
121 typeKeyStore.destroy();
122 tunnelNameAsKeyStore.destroy();
123 log.info("Stopped");
124 }
125
126 @Override
127 public TunnelId createOrUpdateTunnel(Tunnel tunnel) {
128 // tunnelIdAsKeyStore.
129 if (tunnel.tunnelId() != null && !"".equals(tunnel.tunnelId())) {
130 Tunnel old = tunnelIdAsKeyStore.get(tunnel.tunnelId());
131 if (old == null) {
132 log.info("This tunnel[" + tunnel.tunnelId() + "] is not available.");
133 return tunnel.tunnelId();
134 }
135 Tunnel newT = new DefaultTunnel(tunnel.providerId(), tunnel.src(),
136 tunnel.dst(), tunnel.type(),
137 tunnel.state(), tunnel.groupId(),
138 old.tunnelId(),
139 tunnel.tunnelName(),
140 tunnel.annotations());
141 tunnelIdAsKeyStore.remove(tunnel.tunnelId());
142 tunnelIdAsKeyStore.put(tunnel.tunnelId(), newT);
143 TunnelEvent event = new TunnelEvent(
144 TunnelEvent.Type.TUNNEL_UPDATED,
145 tunnel);
146 notifyDelegate(event);
147 return tunnel.tunnelId();
148 } else {
149 TunnelId tunnelId = TunnelId.valueOf(idGenerator.getNewId());
150 Tunnel newT = new DefaultTunnel(tunnel.providerId(), tunnel.src(),
151 tunnel.dst(), tunnel.type(),
152 tunnel.state(), tunnel.groupId(),
153 tunnelId,
154 tunnel.tunnelName(),
155 tunnel.annotations());
156 TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst());
157 tunnelIdAsKeyStore.put(tunnelId, newT);
158 Set<TunnelId> tunnelnameSet = tunnelNameAsKeyStore.get(tunnel
159 .tunnelName());
160 if (tunnelnameSet == null) {
161 tunnelnameSet = new HashSet<TunnelId>();
162 }
163 tunnelnameSet.add(tunnelId);
164 tunnelNameAsKeyStore.put(tunnel
165 .tunnelName(), tunnelnameSet);
166 Set<TunnelId> srcAndDstKeySet = srcAndDstKeyStore.get(key);
167 if (srcAndDstKeySet == null) {
168 srcAndDstKeySet = new HashSet<TunnelId>();
169 }
170 srcAndDstKeySet.add(tunnelId);
171 srcAndDstKeyStore.put(key, srcAndDstKeySet);
172 Set<TunnelId> typeKeySet = typeKeyStore.get(tunnel.type());
173 if (typeKeySet == null) {
174 typeKeySet = new HashSet<TunnelId>();
175 }
176 typeKeySet.add(tunnelId);
177 typeKeyStore.put(tunnel.type(), typeKeySet);
178 TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_ADDED,
179 tunnel);
180 notifyDelegate(event);
181 return tunnelId;
182 }
183 }
184
185 @Override
186 public void deleteTunnel(TunnelId tunnelId) {
187 Tunnel deletedTunnel = tunnelIdAsKeyStore.get(tunnelId);
188 if (deletedTunnel == null) {
189 return;
190 }
191 tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()).remove(tunnelId);
192 tunnelIdAsKeyStore.remove(tunnelId);
193 TunnelKey key = new TunnelKey(deletedTunnel.src(), deletedTunnel.dst());
194 srcAndDstKeyStore.get(key).remove(tunnelId);
195 typeKeyStore.get(deletedTunnel.type()).remove(tunnelId);
196 TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
197 deletedTunnel);
198 notifyDelegate(event);
199 }
200
201 @Override
202 public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst,
203 ProviderId producerName) {
204 TunnelKey key = TunnelKey.tunnelKey(src, dst);
205 Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
206 if (idSet == null) {
207 return;
208 }
209 Tunnel deletedTunnel = null;
210 TunnelEvent event = null;
211 List<TunnelEvent> ls = new ArrayList<TunnelEvent>();
212 for (TunnelId id : idSet) {
213 deletedTunnel = tunnelIdAsKeyStore.get(id);
214 event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
215 deletedTunnel);
216 ls.add(event);
217 if (producerName.equals(deletedTunnel.providerId())) {
218 tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId());
219 tunnelNameAsKeyStore.get(deletedTunnel.tunnelName())
220 .remove(deletedTunnel.tunnelId());
221 srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId());
222 typeKeyStore.get(deletedTunnel.type())
223 .remove(deletedTunnel.tunnelId());
224 }
225 }
226 notifyDelegate(ls);
227 }
228
229 @Override
230 public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, Type type,
231 ProviderId producerName) {
232 TunnelKey key = TunnelKey.tunnelKey(src, dst);
233 Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
234 if (idSet == null) {
235 return;
236 }
237 Tunnel deletedTunnel = null;
238 TunnelEvent event = null;
239 List<TunnelEvent> ls = new ArrayList<TunnelEvent>();
240 for (TunnelId id : idSet) {
241 deletedTunnel = tunnelIdAsKeyStore.get(id);
242 event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
243 deletedTunnel);
244 ls.add(event);
245 if (producerName.equals(deletedTunnel.providerId())
246 && type.equals(deletedTunnel.type())) {
247 tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId());
248 tunnelNameAsKeyStore.get(deletedTunnel.tunnelName())
249 .remove(deletedTunnel.tunnelId());
250 srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId());
251 typeKeyStore.get(deletedTunnel.type())
252 .remove(deletedTunnel.tunnelId());
253 }
254 }
255 notifyDelegate(ls);
256 }
257
258 @Override
259 public Tunnel borrowTunnel(ApplicationId appId, TunnelId tunnelId,
260 Annotations... annotations) {
261 Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
262 if (orderSet == null) {
263 orderSet = new HashSet<TunnelSubscription>();
264 }
265 TunnelSubscription order = new TunnelSubscription(appId, null, null, tunnelId, null, null,
266 annotations);
267 Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
268 if (result != null || Tunnel.State.INACTIVE.equals(result.state())) {
269 return null;
270 }
271 orderSet.add(order);
272 orderRelationship.put(appId, orderSet);
273 return result;
274 }
275
276 @Override
277 public Collection<Tunnel> borrowTunnel(ApplicationId appId,
278 TunnelEndPoint src,
279 TunnelEndPoint dst,
280 Annotations... annotations) {
281 Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
282 if (orderSet == null) {
283 orderSet = new HashSet<TunnelSubscription>();
284 }
285 TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, null, null, annotations);
286 boolean isExist = orderSet.contains(order);
287 if (!isExist) {
288 orderSet.add(order);
289 }
290 orderRelationship.put(appId, orderSet);
291 TunnelKey key = TunnelKey.tunnelKey(src, dst);
292 Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
293 if (idSet == null || idSet.size() == 0) {
294 return Collections.emptySet();
295 }
296 Collection<Tunnel> tunnelSet = new HashSet<Tunnel>();
297 for (TunnelId tunnelId : idSet) {
298 Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
299 if (Tunnel.State.ACTIVE.equals(result.state())) {
300 tunnelSet.add(result);
301 }
302 }
303 return tunnelSet;
304 }
305
306 @Override
307 public Collection<Tunnel> borrowTunnel(ApplicationId appId,
308 TunnelEndPoint src,
309 TunnelEndPoint dst, Type type,
310 Annotations... annotations) {
311 Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
312 if (orderSet == null) {
313 orderSet = new HashSet<TunnelSubscription>();
314 }
315 TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, type, null, annotations);
316 boolean isExist = orderSet.contains(order);
317 if (!isExist) {
318 orderSet.add(order);
319 }
320 orderRelationship.put(appId, orderSet);
321 TunnelKey key = TunnelKey.tunnelKey(src, dst);
322 Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
323 if (idSet == null || idSet.size() == 0) {
324 return Collections.emptySet();
325 }
326 Collection<Tunnel> tunnelSet = new HashSet<Tunnel>();
327 for (TunnelId tunnelId : idSet) {
328 Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
329 if (type.equals(result.type())
330 && Tunnel.State.ACTIVE.equals(result.state())) {
331 tunnelSet.add(result);
332 }
333 }
334 return tunnelSet;
335 }
336
337 @Override
338 public Collection<Tunnel> borrowTunnel(ApplicationId appId,
339 TunnelName tunnelName,
340 Annotations... annotations) {
341 Set<TunnelSubscription> orderSet = orderRelationship.get(appId);
342 if (orderSet == null) {
343 orderSet = new HashSet<TunnelSubscription>();
344 }
345 TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName,
346 annotations);
347 boolean isExist = orderSet.contains(order);
348 if (!isExist) {
349 orderSet.add(order);
350 }
351 orderRelationship.put(appId, orderSet);
352 Set<TunnelId> idSet = tunnelNameAsKeyStore.get(tunnelName);
353 if (idSet == null || idSet.size() == 0) {
354 return Collections.emptySet();
355 }
356 Collection<Tunnel> tunnelSet = new HashSet<Tunnel>();
357 for (TunnelId tunnelId : idSet) {
358 Tunnel result = tunnelIdAsKeyStore.get(tunnelId);
359 if (Tunnel.State.ACTIVE.equals(result.state())) {
360 tunnelSet.add(result);
361 }
362 }
363 return tunnelSet;
364 }
365
366 @Override
367 public boolean returnTunnel(ApplicationId appId, TunnelName tunnelName,
368 Annotations... annotations) {
369 TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName,
370 annotations);
371 return deleteOrder(order);
372 }
373
374 @Override
375 public boolean returnTunnel(ApplicationId appId, TunnelId tunnelId,
376 Annotations... annotations) {
377 TunnelSubscription order = new TunnelSubscription(appId, null, null, tunnelId, null, null,
378 annotations);
379 return deleteOrder(order);
380 }
381
382 @Override
383 public boolean returnTunnel(ApplicationId appId, TunnelEndPoint src,
384 TunnelEndPoint dst, Type type,
385 Annotations... annotations) {
386 TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, type, null, annotations);
387 return deleteOrder(order);
388 }
389
390 @Override
391 public boolean returnTunnel(ApplicationId appId, TunnelEndPoint src,
392 TunnelEndPoint dst, Annotations... annotations) {
393 TunnelSubscription order = new TunnelSubscription(appId, src, dst, null, null, null, annotations);
394 return deleteOrder(order);
395 }
396
397 private boolean deleteOrder(TunnelSubscription order) {
398 Set<TunnelSubscription> orderSet = orderRelationship.get(order.consumerId());
399 if (orderSet == null) {
400 return true;
401 }
402 if (orderSet.contains(order)) {
403 orderSet.remove(order);
404 return true;
405 }
406 return false;
407 }
408
409 @Override
410 public Tunnel queryTunnel(TunnelId tunnelId) {
411 return tunnelIdAsKeyStore.get(tunnelId);
412 }
413
414 @Override
415 public Collection<TunnelSubscription> queryTunnelSubscription(ApplicationId appId) {
416 return orderRelationship.get(appId) != null ? ImmutableSet.copyOf(orderRelationship
417 .get(appId)) : Collections.emptySet();
418 }
419
420 @Override
421 public Collection<Tunnel> queryTunnel(Type type) {
422 Collection<Tunnel> result = new HashSet<Tunnel>();
423 Set<TunnelId> tunnelIds = typeKeyStore.get(type);
424 if (tunnelIds == null) {
425 return Collections.emptySet();
426 }
427 for (TunnelId id : tunnelIds) {
428 result.add(tunnelIdAsKeyStore.get(id));
429 }
430 return result.size() == 0 ? Collections.emptySet() : ImmutableSet
431 .copyOf(result);
432 }
433
434 @Override
435 public Collection<Tunnel> queryTunnel(TunnelEndPoint src, TunnelEndPoint dst) {
436 Collection<Tunnel> result = new HashSet<Tunnel>();
437 TunnelKey key = TunnelKey.tunnelKey(src, dst);
438 Set<TunnelId> tunnelIds = srcAndDstKeyStore.get(key);
439 if (tunnelIds == null) {
440 return Collections.emptySet();
441 }
442 for (TunnelId id : tunnelIds) {
443 result.add(tunnelIdAsKeyStore.get(id));
444 }
445 return result.size() == 0 ? Collections.emptySet() : ImmutableSet
446 .copyOf(result);
447 }
448
449 @Override
450 public int tunnelCount() {
451 return tunnelIdAsKeyStore.size();
452 }
453
454 /**
455 * Uses source TunnelPoint and destination TunnelPoint as map key.
456 */
457 private static final class TunnelKey {
458 private final TunnelEndPoint src;
459 private final TunnelEndPoint dst;
460
461 private TunnelKey(TunnelEndPoint src, TunnelEndPoint dst) {
462 this.src = src;
463 this.dst = dst;
464
465 }
466
467 /**
468 * create a map key.
469 *
470 * @param src
471 * @param dst
472 * @return a key using source ip and destination ip
473 */
474 static TunnelKey tunnelKey(TunnelEndPoint src, TunnelEndPoint dst) {
475 return new TunnelKey(src, dst);
476 }
477
478 @Override
479 public int hashCode() {
480 return Objects.hash(src, dst);
481 }
482
483 @Override
484 public boolean equals(Object obj) {
485 if (this == obj) {
486 return true;
487 }
488 if (obj instanceof TunnelKey) {
489 final TunnelKey other = (TunnelKey) obj;
490 return Objects.equals(this.src, other.src)
491 && Objects.equals(this.dst, other.dst);
492 }
493 return false;
494 }
495
496 @Override
497 public String toString() {
498 return MoreObjects.toStringHelper(getClass()).add("src", src)
499 .add("dst", dst).toString();
500 }
501 }
502
503}