blob: 3ec0abb0ed29db8f1581997f478acbe0cb1b4fec [file] [log] [blame]
alshabib79e52872015-12-07 16:01:01 -08001package org.onosproject.incubator.store.mcast.impl;
2
3import org.apache.felix.scr.annotations.Activate;
4import org.apache.felix.scr.annotations.Component;
5import org.apache.felix.scr.annotations.Deactivate;
6import org.apache.felix.scr.annotations.Reference;
7import org.apache.felix.scr.annotations.ReferenceCardinality;
8import org.apache.felix.scr.annotations.Service;
alshabib6ec1ee72015-12-18 19:42:24 -08009import org.onlab.packet.IpAddress;
alshabib79e52872015-12-07 16:01:01 -080010import org.onlab.util.KryoNamespace;
11import org.onosproject.net.ConnectPoint;
12import org.onosproject.net.mcast.McastEvent;
13import org.onosproject.net.mcast.McastRoute;
14import org.onosproject.net.mcast.McastRouteInfo;
15import org.onosproject.net.mcast.McastStore;
16import org.onosproject.net.mcast.McastStoreDelegate;
17import org.onosproject.store.AbstractStore;
18import org.onosproject.store.service.ConsistentMap;
19import org.onosproject.store.service.Serializer;
20import org.onosproject.store.service.StorageService;
21import org.slf4j.Logger;
22
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
26
27import static org.slf4j.LoggerFactory.getLogger;
28
29/**
30 * A distributed mcast store implementation. Routes are stored consistently
31 * across the cluster.
32 */
33@Component(immediate = true)
34@Service
35public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
36 implements McastStore {
37 //FIXME the number of events that will potentially be generated here is
alshabib6ec1ee72015-12-18 19:42:24 -080038 // not sustainable, consider changing this to an eventually consistent
39 // map and not emitting events but rather use a provider-like mechanism
40 // to program the dataplane.
alshabib79e52872015-12-07 16:01:01 -080041
42 private static final String MCASTRIB = "mcast-rib-table";
43 private Logger log = getLogger(getClass());
44
45 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 private StorageService storageService;
47
Jonathan Hart51539b82015-10-29 09:53:04 -070048 protected ConsistentMap<McastRoute, MulticastData> mcastRib;
alshabib79e52872015-12-07 16:01:01 -080049 protected Map<McastRoute, MulticastData> mcastRoutes;
50
51
52 @Activate
53 public void activate() {
54
Jonathan Hart51539b82015-10-29 09:53:04 -070055 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080056 .withName(MCASTRIB)
57 .withSerializer(Serializer.using(KryoNamespace.newBuilder().register(
58 MulticastData.class,
59 McastRoute.class,
60 McastRoute.Type.class,
alshabib6ec1ee72015-12-18 19:42:24 -080061 IpAddress.class,
alshabib79e52872015-12-07 16:01:01 -080062 List.class,
63 ConnectPoint.class
64 ).build()))
65 .withRelaxedReadConsistency()
66 .build();
67
Jonathan Hart51539b82015-10-29 09:53:04 -070068 mcastRoutes = mcastRib.asJavaMap();
alshabib79e52872015-12-07 16:01:01 -080069
70
71 log.info("Started");
72 }
73
74 @Deactivate
75 public void deactivate() {
76 log.info("Stopped");
77 }
78
79 @Override
80 public void storeRoute(McastRoute route, Type operation) {
81 switch (operation) {
82 case ADD:
83 if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
84 delegate.notify(new McastEvent(McastEvent.Type.ROUTE_ADDED,
85 McastRouteInfo.mcastRouteInfo(route)));
86 }
87 break;
88 case REMOVE:
89 if (mcastRoutes.remove(route) != null) {
90 delegate.notify(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
91 McastRouteInfo.mcastRouteInfo(route)));
92 }
93 break;
94 default:
95 log.warn("Unknown mcast operation type: {}", operation);
96 }
97 }
98
99 @Override
100 public void storeSource(McastRoute route, ConnectPoint source) {
101 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
102 if (v == null) {
103 return new MulticastData(source);
104 } else {
105 v.setSource(source);
106 }
107 return v;
108 });
109
110
111 if (data != null) {
112 delegate.notify(new McastEvent(McastEvent.Type.SOURCE_ADDED,
113 McastRouteInfo.mcastRouteInfo(route,
114 data.sinks(),
115 source)));
116 }
117
118 }
119
120 @Override
121 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
122 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
123 switch (operation) {
124 case ADD:
125 if (v == null) {
126 v = MulticastData.empty();
127 }
128 v.appendSink(sink);
129 break;
130 case REMOVE:
131 if (v != null) {
132 v.removeSink(sink);
133 }
134 break;
135 default:
136 log.warn("Unknown mcast operation type: {}", operation);
137 }
138 return v;
139 });
140
141
142 if (data != null) {
143 switch (operation) {
144 case ADD:
145 delegate.notify(new McastEvent(
146 McastEvent.Type.SINK_ADDED,
147 McastRouteInfo.mcastRouteInfo(route,
148 sink,
149 data.source())));
150 break;
151 case REMOVE:
152 if (data != null) {
153 delegate.notify(new McastEvent(
154 McastEvent.Type.SINK_REMOVED,
155 McastRouteInfo.mcastRouteInfo(route,
156 sink,
157 data.source())));
158 }
159 break;
160 default:
161 log.warn("Unknown mcast operation type: {}", operation);
162 }
163 }
164
165 }
166
167 @Override
168 public ConnectPoint sourceFor(McastRoute route) {
169 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
170 }
171
172 @Override
173 public Set<ConnectPoint> sinksFor(McastRoute route) {
174 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
175 }
176
177}