blob: 8bdf42e630212609cf8da16f212cf81048c38cab [file] [log] [blame]
Andrea Campanella545edb42018-03-20 16:37:29 -07001/*
2 * Copyright 2015-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.mcast.impl;
17
18
19import com.google.common.collect.ImmutableSet;
20import com.google.common.collect.Sets;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.util.KryoNamespace;
28import org.onosproject.mcast.api.McastEvent;
29import org.onosproject.mcast.api.McastRoute;
30import org.onosproject.mcast.api.McastRouteData;
31import org.onosproject.mcast.api.McastStore;
32import org.onosproject.mcast.api.McastStoreDelegate;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.HostId;
35import org.onosproject.store.AbstractStore;
36import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.ConsistentMap;
38import org.onosproject.store.service.MapEvent;
39import org.onosproject.store.service.MapEventListener;
40import org.onosproject.store.service.Serializer;
41import org.onosproject.store.service.StorageService;
42import org.onosproject.store.service.Versioned;
43import org.slf4j.Logger;
44
45import java.util.Collection;
46import java.util.Map;
47import java.util.Optional;
48import java.util.Set;
49import java.util.concurrent.ScheduledExecutorService;
50import java.util.concurrent.atomic.AtomicReference;
51import java.util.stream.Collectors;
52
53import static com.google.common.base.Preconditions.checkNotNull;
54import static org.onosproject.mcast.api.McastRouteUpdate.mcastRouteUpdate;
55import static org.slf4j.LoggerFactory.getLogger;
56
57/**
58 * New distributed mcast route store implementation. Routes are stored consistently
59 * across the cluster.
60 */
61@Component(immediate = true)
62@Service
63public class DistributedMcastRoutesStore
64 extends AbstractStore<McastEvent, McastStoreDelegate>
65 implements McastStore {
66
67 private static final String MCASTRIB = "onos-mcast-route-table";
68 private Logger log = getLogger(getClass());
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected StorageService storageService;
72
73 private Map<McastRoute, McastRouteData> mcastRoutes;
74 private ConsistentMap<McastRoute, McastRouteData> mcastRib;
75 private MapEventListener<McastRoute, McastRouteData> mcastRouteListener =
76 new McastRouteListener();
77
78 private ScheduledExecutorService executor;
79
80
81 @Activate
82 public void activate() {
83 mcastRib = storageService.<McastRoute, McastRouteData>consistentMapBuilder()
84 .withName(MCASTRIB)
85 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
86 .register(KryoNamespaces.API)
87 .register(
88 McastRoute.class,
89 AtomicReference.class,
90 McastRouteData.class,
91 McastRoute.Type.class
92 ).build()))
93 .build();
94
95 mcastRoutes = mcastRib.asJavaMap();
96 mcastRib.addListener(mcastRouteListener);
97
98 log.info("Started");
99 }
100
101 @Deactivate
102 public void deactivate() {
103 mcastRib.removeListener(mcastRouteListener);
104 mcastRib.destroy();
105 log.info("Stopped");
106 }
107
108 @Override
109 public void storeRoute(McastRoute route) {
110 mcastRoutes.put(route, McastRouteData.empty());
111 }
112
113 @Override
114 public void removeRoute(McastRoute route) {
115 mcastRoutes.remove(route);
116 }
117
118 @Override
119 public void storeSources(McastRoute route, Set<ConnectPoint> sources) {
120 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
121 v.addSources(sources);
122 return v;
123 });
124 }
125
126 @Override
127 public void removeSources(McastRoute route) {
128 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
129 v.removeSources();
130 return v;
131 });
132 }
133
134 @Override
135 public void removeSources(McastRoute route, Set<ConnectPoint> sources) {
136 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
137 v.removeSources(sources);
138 return v;
139 });
140
141 }
142
143 @Override
144 public void addSink(McastRoute route, HostId hostId, Set<ConnectPoint> sinks) {
145 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
146 v.addSinks(hostId, sinks);
147 return v;
148 });
149 }
150
151 @Override
152 public void addSinks(McastRoute route, Set<ConnectPoint> sinks) {
153 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
154 v.addSinks(HostId.NONE, sinks);
155 return v;
156 });
157 }
158
159
160 @Override
161 public void removeSinks(McastRoute route) {
162 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
163 v.removeSinks();
164 return v;
165 });
166 }
167
168 @Override
169 public void removeSink(McastRoute route, HostId hostId) {
170 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
171 v.removeSinks(hostId);
172 return v;
173 });
174 }
175
176 @Override
177 public void removeSinks(McastRoute route, HostId hostId, Set<ConnectPoint> sinks) {
178 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
179 v.removeSinks(hostId, sinks);
180 return v;
181 });
182 }
183
184 @Override
185 public void removeSinks(McastRoute route, Set<ConnectPoint> sinks) {
186 McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
187 v.removeSinks(HostId.NONE, sinks);
188 return v;
189 });
190 }
191
192 @Override
193 public Set<ConnectPoint> sourcesFor(McastRoute route) {
194 McastRouteData data = mcastRoutes.getOrDefault(route, null);
195 return data == null ? ImmutableSet.of() : ImmutableSet.copyOf(data.sources());
196 }
197
198 @Override
199 public Set<ConnectPoint> sinksFor(McastRoute route) {
200 McastRouteData data = mcastRoutes.getOrDefault(route, null);
201 return data == null ? ImmutableSet.of() : ImmutableSet.copyOf(data.sinks().values().stream()
202 .flatMap(Collection::stream).collect(Collectors.toSet()));
203 }
204
205 @Override
206 public Set<ConnectPoint> sinksFor(McastRoute route, HostId hostId) {
207 McastRouteData data = mcastRoutes.getOrDefault(route, null);
208 return data == null ? ImmutableSet.of() : ImmutableSet.copyOf(data.sinks(hostId));
209 }
210
211 @Override
212 public Set<McastRoute> getRoutes() {
213 return ImmutableSet.copyOf(mcastRoutes.keySet());
214 }
215
216 @Override
217 public McastRouteData getRouteData(McastRoute route) {
218 return mcastRoutes.get(route);
219 }
220
221 private class McastRouteListener implements MapEventListener<McastRoute, McastRouteData> {
222 @Override
223 public void event(MapEvent<McastRoute, McastRouteData> event) {
224 final McastRoute route = event.key();
225 final McastRouteData newData =
226 Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
227 final McastRouteData oldData =
228 Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
229
230 switch (event.type()) {
231 case INSERT:
232 checkNotNull(newData);
233 McastEvent.Type type;
234 if (!newData.sources().isEmpty() || !newData.sinks().isEmpty()) {
235 type = McastEvent.Type.SOURCES_ADDED;
236 } else if (!newData.sinks().isEmpty()) {
237 type = McastEvent.Type.SINKS_ADDED;
238 } else {
239 type = McastEvent.Type.ROUTE_ADDED;
240 }
241 notifyDelegate(new McastEvent(type, null,
242 mcastRouteUpdate(route, newData.sources(), newData.sinks())));
243 break;
244 case UPDATE:
245 checkNotNull(newData);
246 checkNotNull(oldData);
247
248 if (!Sets.difference(newData.sources(), oldData.sources()).isEmpty()) {
249 notifyDelegate(new McastEvent(McastEvent.Type.SOURCES_ADDED,
250 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
251 mcastRouteUpdate(route, newData.sources(), newData.sinks())));
252 }
253 if (!Sets.difference(oldData.sources(), newData.sources()).isEmpty()) {
254 notifyDelegate(new McastEvent(McastEvent.Type.SOURCES_REMOVED,
255 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
256 mcastRouteUpdate(route, newData.sources(), newData.sinks())));
257 }
258 if (newData.allSinks().size() > oldData.allSinks().size()) {
259 notifyDelegate(new McastEvent(McastEvent.Type.SINKS_ADDED,
260 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
261 mcastRouteUpdate(route, newData.sources(), newData.sinks())));
262 } else if (newData.allSinks().size() < oldData.allSinks().size()) {
263 log.info("Removed");
264 notifyDelegate(new McastEvent(McastEvent.Type.SINKS_REMOVED,
265 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
266 mcastRouteUpdate(route, newData.sources(), newData.sinks())));
267 }
268 break;
269 case REMOVE:
270 // Verify old data is not null
271 checkNotNull(oldData);
272 // Create a route removed event with just the route
273 // and the source connect point
274 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
275 mcastRouteUpdate(route, oldData.sources(), oldData.sinks()),
276 null));
277 break;
278 default:
279 log.warn("Unknown mcast operation type: {}", event.type());
280 }
281 }
282 }
283}