blob: 5af5266bc663a7e5768f1908cb1ddc06f49c6c21 [file] [log] [blame]
alshabib79e52872015-12-07 16:01:01 -08001package org.onosproject.incubator.store.mcast.impl;
2
3import org.apache.felix.scr.annotations.Activate;
4import org.apache.felix.scr.annotations.Component;
5import org.apache.felix.scr.annotations.Deactivate;
6import org.apache.felix.scr.annotations.Reference;
7import org.apache.felix.scr.annotations.ReferenceCardinality;
8import org.apache.felix.scr.annotations.Service;
alshabib79e52872015-12-07 16:01:01 -08009import org.onlab.util.KryoNamespace;
10import org.onosproject.net.ConnectPoint;
11import org.onosproject.net.mcast.McastEvent;
12import org.onosproject.net.mcast.McastRoute;
13import org.onosproject.net.mcast.McastRouteInfo;
14import org.onosproject.net.mcast.McastStore;
15import org.onosproject.net.mcast.McastStoreDelegate;
16import org.onosproject.store.AbstractStore;
Jonathan Hart07eb0412016-02-08 16:42:29 -080017import org.onosproject.store.serializers.KryoNamespaces;
alshabib79e52872015-12-07 16:01:01 -080018import org.onosproject.store.service.ConsistentMap;
19import org.onosproject.store.service.Serializer;
20import org.onosproject.store.service.StorageService;
21import org.slf4j.Logger;
22
alshabib79e52872015-12-07 16:01:01 -080023import java.util.Map;
24import java.util.Set;
25
26import static org.slf4j.LoggerFactory.getLogger;
27
28/**
29 * A distributed mcast store implementation. Routes are stored consistently
30 * across the cluster.
31 */
32@Component(immediate = true)
33@Service
34public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
35 implements McastStore {
36 //FIXME the number of events that will potentially be generated here is
alshabib6ec1ee72015-12-18 19:42:24 -080037 // not sustainable, consider changing this to an eventually consistent
38 // map and not emitting events but rather use a provider-like mechanism
39 // to program the dataplane.
alshabib79e52872015-12-07 16:01:01 -080040
41 private static final String MCASTRIB = "mcast-rib-table";
42 private Logger log = getLogger(getClass());
43
44 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart07eb0412016-02-08 16:42:29 -080045 protected StorageService storageService;
alshabib79e52872015-12-07 16:01:01 -080046
Jonathan Hart51539b82015-10-29 09:53:04 -070047 protected ConsistentMap<McastRoute, MulticastData> mcastRib;
alshabib79e52872015-12-07 16:01:01 -080048 protected Map<McastRoute, MulticastData> mcastRoutes;
49
50
51 @Activate
52 public void activate() {
53
Jonathan Hart51539b82015-10-29 09:53:04 -070054 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080055 .withName(MCASTRIB)
Jonathan Hart07eb0412016-02-08 16:42:29 -080056 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
57 .register(KryoNamespaces.BASIC)
58 .register(KryoNamespaces.MISC)
59 .register(
alshabib79e52872015-12-07 16:01:01 -080060 MulticastData.class,
61 McastRoute.class,
62 McastRoute.Type.class,
alshabib79e52872015-12-07 16:01:01 -080063 ConnectPoint.class
64 ).build()))
Jonathan Hart1d006392016-02-11 09:12:56 -080065 //.withRelaxedReadConsistency()
alshabib79e52872015-12-07 16:01:01 -080066 .build();
67
Jonathan Hart51539b82015-10-29 09:53:04 -070068 mcastRoutes = mcastRib.asJavaMap();
alshabib79e52872015-12-07 16:01:01 -080069
70
71 log.info("Started");
72 }
73
74 @Deactivate
75 public void deactivate() {
76 log.info("Stopped");
77 }
78
79 @Override
80 public void storeRoute(McastRoute route, Type operation) {
81 switch (operation) {
82 case ADD:
83 if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
84 delegate.notify(new McastEvent(McastEvent.Type.ROUTE_ADDED,
85 McastRouteInfo.mcastRouteInfo(route)));
86 }
87 break;
88 case REMOVE:
89 if (mcastRoutes.remove(route) != null) {
90 delegate.notify(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
91 McastRouteInfo.mcastRouteInfo(route)));
92 }
93 break;
94 default:
95 log.warn("Unknown mcast operation type: {}", operation);
96 }
97 }
98
99 @Override
100 public void storeSource(McastRoute route, ConnectPoint source) {
101 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
102 if (v == null) {
103 return new MulticastData(source);
104 } else {
105 v.setSource(source);
106 }
107 return v;
108 });
109
110
111 if (data != null) {
112 delegate.notify(new McastEvent(McastEvent.Type.SOURCE_ADDED,
113 McastRouteInfo.mcastRouteInfo(route,
114 data.sinks(),
115 source)));
116 }
117
118 }
119
120 @Override
121 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
122 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
123 switch (operation) {
124 case ADD:
125 if (v == null) {
126 v = MulticastData.empty();
127 }
128 v.appendSink(sink);
129 break;
130 case REMOVE:
131 if (v != null) {
132 v.removeSink(sink);
133 }
134 break;
135 default:
136 log.warn("Unknown mcast operation type: {}", operation);
137 }
138 return v;
139 });
140
141
142 if (data != null) {
143 switch (operation) {
144 case ADD:
145 delegate.notify(new McastEvent(
146 McastEvent.Type.SINK_ADDED,
147 McastRouteInfo.mcastRouteInfo(route,
148 sink,
149 data.source())));
150 break;
151 case REMOVE:
152 if (data != null) {
153 delegate.notify(new McastEvent(
154 McastEvent.Type.SINK_REMOVED,
155 McastRouteInfo.mcastRouteInfo(route,
156 sink,
157 data.source())));
158 }
159 break;
160 default:
161 log.warn("Unknown mcast operation type: {}", operation);
162 }
163 }
164
165 }
166
167 @Override
168 public ConnectPoint sourceFor(McastRoute route) {
169 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
170 }
171
172 @Override
173 public Set<ConnectPoint> sinksFor(McastRoute route) {
174 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
175 }
176
Jonathan Hart07eb0412016-02-08 16:42:29 -0800177 @Override
178 public Set<McastRoute> getRoutes() {
179 return mcastRoutes.keySet();
180 }
181
alshabib79e52872015-12-07 16:01:01 -0800182}