blob: 331ecbb554336a5de19485d87ea59d5699ac44a0 [file] [log] [blame]
alshabib79e52872015-12-07 16:01:01 -08001package org.onosproject.incubator.store.mcast.impl;
2
3import org.apache.felix.scr.annotations.Activate;
4import org.apache.felix.scr.annotations.Component;
5import org.apache.felix.scr.annotations.Deactivate;
6import org.apache.felix.scr.annotations.Reference;
7import org.apache.felix.scr.annotations.ReferenceCardinality;
8import org.apache.felix.scr.annotations.Service;
alshabib79e52872015-12-07 16:01:01 -08009import org.onlab.util.KryoNamespace;
10import org.onosproject.net.ConnectPoint;
11import org.onosproject.net.mcast.McastEvent;
12import org.onosproject.net.mcast.McastRoute;
13import org.onosproject.net.mcast.McastRouteInfo;
14import org.onosproject.net.mcast.McastStore;
15import org.onosproject.net.mcast.McastStoreDelegate;
16import org.onosproject.store.AbstractStore;
Jonathan Hart07eb0412016-02-08 16:42:29 -080017import org.onosproject.store.serializers.KryoNamespaces;
alshabib79e52872015-12-07 16:01:01 -080018import org.onosproject.store.service.ConsistentMap;
19import org.onosproject.store.service.Serializer;
20import org.onosproject.store.service.StorageService;
21import org.slf4j.Logger;
22
alshabib79e52872015-12-07 16:01:01 -080023import java.util.Map;
24import java.util.Set;
25
26import static org.slf4j.LoggerFactory.getLogger;
27
28/**
29 * A distributed mcast store implementation. Routes are stored consistently
30 * across the cluster.
31 */
32@Component(immediate = true)
33@Service
34public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
35 implements McastStore {
36 //FIXME the number of events that will potentially be generated here is
alshabib6ec1ee72015-12-18 19:42:24 -080037 // not sustainable, consider changing this to an eventually consistent
38 // map and not emitting events but rather use a provider-like mechanism
39 // to program the dataplane.
alshabib79e52872015-12-07 16:01:01 -080040
41 private static final String MCASTRIB = "mcast-rib-table";
42 private Logger log = getLogger(getClass());
43
44 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart07eb0412016-02-08 16:42:29 -080045 protected StorageService storageService;
alshabib79e52872015-12-07 16:01:01 -080046
Jonathan Hart51539b82015-10-29 09:53:04 -070047 protected ConsistentMap<McastRoute, MulticastData> mcastRib;
alshabib79e52872015-12-07 16:01:01 -080048 protected Map<McastRoute, MulticastData> mcastRoutes;
49
50
51 @Activate
52 public void activate() {
53
Jonathan Hart51539b82015-10-29 09:53:04 -070054 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080055 .withName(MCASTRIB)
Jonathan Hart07eb0412016-02-08 16:42:29 -080056 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
Jonathan Hart6ccfc5a2016-02-12 19:26:02 -080057 .register(KryoNamespaces.API)
Jonathan Hart07eb0412016-02-08 16:42:29 -080058 .register(
alshabib79e52872015-12-07 16:01:01 -080059 MulticastData.class,
60 McastRoute.class,
Jonathan Hart6ccfc5a2016-02-12 19:26:02 -080061 McastRoute.Type.class
alshabib79e52872015-12-07 16:01:01 -080062 ).build()))
Jonathan Hart1d006392016-02-11 09:12:56 -080063 //.withRelaxedReadConsistency()
alshabib79e52872015-12-07 16:01:01 -080064 .build();
65
Jonathan Hart51539b82015-10-29 09:53:04 -070066 mcastRoutes = mcastRib.asJavaMap();
alshabib79e52872015-12-07 16:01:01 -080067
68
69 log.info("Started");
70 }
71
72 @Deactivate
73 public void deactivate() {
74 log.info("Stopped");
75 }
76
77 @Override
78 public void storeRoute(McastRoute route, Type operation) {
79 switch (operation) {
80 case ADD:
81 if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
82 delegate.notify(new McastEvent(McastEvent.Type.ROUTE_ADDED,
83 McastRouteInfo.mcastRouteInfo(route)));
84 }
85 break;
86 case REMOVE:
87 if (mcastRoutes.remove(route) != null) {
88 delegate.notify(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
89 McastRouteInfo.mcastRouteInfo(route)));
90 }
91 break;
92 default:
93 log.warn("Unknown mcast operation type: {}", operation);
94 }
95 }
96
97 @Override
98 public void storeSource(McastRoute route, ConnectPoint source) {
99 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
100 if (v == null) {
101 return new MulticastData(source);
102 } else {
103 v.setSource(source);
104 }
105 return v;
106 });
107
108
109 if (data != null) {
110 delegate.notify(new McastEvent(McastEvent.Type.SOURCE_ADDED,
111 McastRouteInfo.mcastRouteInfo(route,
112 data.sinks(),
113 source)));
114 }
115
116 }
117
118 @Override
119 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
120 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
121 switch (operation) {
122 case ADD:
123 if (v == null) {
124 v = MulticastData.empty();
125 }
126 v.appendSink(sink);
127 break;
128 case REMOVE:
129 if (v != null) {
130 v.removeSink(sink);
131 }
132 break;
133 default:
134 log.warn("Unknown mcast operation type: {}", operation);
135 }
136 return v;
137 });
138
139
140 if (data != null) {
141 switch (operation) {
142 case ADD:
143 delegate.notify(new McastEvent(
144 McastEvent.Type.SINK_ADDED,
145 McastRouteInfo.mcastRouteInfo(route,
146 sink,
147 data.source())));
148 break;
149 case REMOVE:
150 if (data != null) {
151 delegate.notify(new McastEvent(
152 McastEvent.Type.SINK_REMOVED,
153 McastRouteInfo.mcastRouteInfo(route,
154 sink,
155 data.source())));
156 }
157 break;
158 default:
159 log.warn("Unknown mcast operation type: {}", operation);
160 }
161 }
162
163 }
164
165 @Override
166 public ConnectPoint sourceFor(McastRoute route) {
167 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
168 }
169
170 @Override
171 public Set<ConnectPoint> sinksFor(McastRoute route) {
172 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
173 }
174
Jonathan Hart07eb0412016-02-08 16:42:29 -0800175 @Override
176 public Set<McastRoute> getRoutes() {
177 return mcastRoutes.keySet();
178 }
179
alshabib79e52872015-12-07 16:01:01 -0800180}