blob: bc848d1df753d21f3bc111cba798af936b95316f [file] [log] [blame]
Andrea Campanella545edb42018-03-20 16:37:29 -07001/*
2 * Copyright 2015-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.mcast.impl;
17
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -070018import com.google.common.base.Objects;
Andrea Campanella545edb42018-03-20 16:37:29 -070019import com.google.common.collect.ImmutableSet;
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -070020import com.google.common.collect.Sets;
Andrea Campanella545edb42018-03-20 16:37:29 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onlab.packet.IpAddress;
28import org.onosproject.event.AbstractListenerManager;
29import org.onosproject.mcast.api.McastEvent;
30import org.onosproject.mcast.api.McastListener;
31import org.onosproject.mcast.api.McastRoute;
32import org.onosproject.mcast.api.McastRouteData;
33import org.onosproject.mcast.api.McastStore;
34import org.onosproject.mcast.api.McastStoreDelegate;
35import org.onosproject.mcast.api.MulticastRouteService;
36import org.onosproject.net.ConnectPoint;
37import org.onosproject.net.Host;
38import org.onosproject.net.HostId;
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -070039import org.onosproject.net.HostLocation;
Andrea Campanella545edb42018-03-20 16:37:29 -070040import org.onosproject.net.host.HostEvent;
41import org.onosproject.net.host.HostListener;
42import org.onosproject.net.host.HostService;
43import org.slf4j.Logger;
44
45import java.util.HashSet;
46import java.util.Optional;
47import java.util.Set;
Jordan Halterman9aff4ea2018-08-13 02:34:35 -070048import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -070050import java.util.stream.Collectors;
Andrea Campanella545edb42018-03-20 16:37:29 -070051
52import static com.google.common.base.Preconditions.checkNotNull;
Jordan Halterman9aff4ea2018-08-13 02:34:35 -070053import static org.onlab.util.Tools.groupedThreads;
Andrea Campanella545edb42018-03-20 16:37:29 -070054import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * An implementation of a multicast route table.
58 */
59@Component(immediate = true)
60@Service
61public class MulticastRouteManager
62 extends AbstractListenerManager<McastEvent, McastListener>
63 implements MulticastRouteService {
64 //TODO: add MulticastRouteAdminService
65
66 private Logger log = getLogger(getClass());
67
68 private final McastStoreDelegate delegate = new InternalMcastStoreDelegate();
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected McastStore store;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected HostService hostService;
75
76 private HostListener hostListener = new InternalHostListener();
Jordan Halterman9aff4ea2018-08-13 02:34:35 -070077 private ExecutorService hostEventExecutor;
Andrea Campanella545edb42018-03-20 16:37:29 -070078
79 @Activate
80 public void activate() {
Jordan Halterman9aff4ea2018-08-13 02:34:35 -070081 hostEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("mcast-event-host", "%d", log));
Andrea Campanella545edb42018-03-20 16:37:29 -070082 hostService.addListener(hostListener);
83 eventDispatcher.addSink(McastEvent.class, listenerRegistry);
84 store.setDelegate(delegate);
85 log.info("Started");
86 }
87
88 @Deactivate
89 public void deactivate() {
Jordan Halterman9aff4ea2018-08-13 02:34:35 -070090 hostEventExecutor.shutdown();
Andrea Campanella545edb42018-03-20 16:37:29 -070091 hostService.removeListener(hostListener);
92 store.unsetDelegate(delegate);
93 eventDispatcher.removeSink(McastEvent.class);
94 log.info("Stopped");
95 }
96
97 @Override
98 public void add(McastRoute route) {
99 checkNotNull(route, "Route cannot be null");
100 store.storeRoute(route);
101 }
102
103 @Override
104 public void remove(McastRoute route) {
105 checkNotNull(route, "Route cannot be null");
106 if (checkRoute(route)) {
107 store.removeRoute(route);
108 }
109 }
110
111 @Override
112 public Set<McastRoute> getRoutes() {
113 return store.getRoutes();
114 }
115
116 @Override
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700117 public Set<McastRoute> getRoute(IpAddress groupIp, IpAddress sourceIp) {
118 // Let's transform it into an optional
119 final Optional<IpAddress> source = Optional.ofNullable(sourceIp);
120 return store.getRoutes().stream()
121 .filter(route -> route.group().equals(groupIp) &&
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200122 Objects.equal(route.source(), source))
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700123 .collect(Collectors.toSet());
Andrea Campanella545edb42018-03-20 16:37:29 -0700124 }
125
126 @Override
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200127 public void addSource(McastRoute route, HostId source) {
Andrea Campanella545edb42018-03-20 16:37:29 -0700128 checkNotNull(route, "Route cannot be null");
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200129 checkNotNull(source, "Source cannot be null");
Andrea Campanella545edb42018-03-20 16:37:29 -0700130 if (checkRoute(route)) {
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200131 Set<ConnectPoint> sources = new HashSet<>();
132 Host host = hostService.getHost(source);
133 if (host != null) {
134 sources.addAll(host.locations());
135 }
136 store.storeSource(route, source, sources);
137 }
138 }
139
140 @Override
141 public void addSources(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints) {
142 checkNotNull(route, "Route cannot be null");
143 checkNotNull(hostId, "HostId cannot be null");
144 checkNotNull(connectPoints, "Sources cannot be null");
145 if (checkRoute(route)) {
146 store.storeSource(route, hostId, connectPoints);
147 }
148 }
149
150 @Override
151 public void addSources(McastRoute route, Set<ConnectPoint> sources) {
152 checkNotNull(route, "Route cannot be null");
153 checkNotNull(sources, "sources cannot be null");
154 if (checkRoute(route)) {
155 store.storeSources(route, sources);
Andrea Campanella545edb42018-03-20 16:37:29 -0700156 }
157 }
158
159 @Override
160 public void removeSources(McastRoute route) {
161 checkNotNull(route, "Route cannot be null");
162 if (checkRoute(route)) {
163 store.removeSources(route);
164 }
165 }
166
167 @Override
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200168 public void removeSource(McastRoute route, HostId source) {
Andrea Campanella545edb42018-03-20 16:37:29 -0700169 checkNotNull(route, "Route cannot be null");
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200170 checkNotNull(source, "Source cannot be null");
Andrea Campanella545edb42018-03-20 16:37:29 -0700171 if (checkRoute(route)) {
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200172 store.removeSource(route, source);
Andrea Campanella545edb42018-03-20 16:37:29 -0700173 }
174 }
175
176 @Override
177 public void addSink(McastRoute route, HostId hostId) {
178 if (checkRoute(route)) {
179 Set<ConnectPoint> sinks = new HashSet<>();
180 Host host = hostService.getHost(hostId);
181 if (host != null) {
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200182 sinks.addAll(host.locations());
Andrea Campanella545edb42018-03-20 16:37:29 -0700183 }
184 store.addSink(route, hostId, sinks);
185 }
186
187 }
188
189 @Override
Andrea Campanella644a8a62018-03-21 19:08:21 -0700190 public void addSinks(McastRoute route, HostId hostId, Set<ConnectPoint> sinks) {
191 if (checkRoute(route)) {
192 store.addSink(route, hostId, sinks);
193 }
194
195 }
196
197 @Override
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200198 public void addSinks(McastRoute route, Set<ConnectPoint> sinks) {
Andrea Campanella545edb42018-03-20 16:37:29 -0700199 checkNotNull(route, "Route cannot be null");
200 checkNotNull(sinks, "Sinks cannot be null");
201 if (checkRoute(route)) {
202 store.addSinks(route, sinks);
203 }
204 }
205
206 @Override
207 public void removeSinks(McastRoute route) {
208 checkNotNull(route, "Route cannot be null");
209 if (checkRoute(route)) {
210 store.removeSinks(route);
211 }
212 }
213
214 @Override
215 public void removeSink(McastRoute route, HostId hostId) {
216 checkNotNull(route, "Route cannot be null");
217 checkNotNull(hostId, "Host cannot be null");
218 if (checkRoute(route)) {
219 store.removeSink(route, hostId);
220 }
221 }
222
223 @Override
Andrea Campanella545edb42018-03-20 16:37:29 -0700224 public void removeSinks(McastRoute route, Set<ConnectPoint> connectPoints) {
225 checkNotNull(route, "Route cannot be null");
226 if (checkRoute(route)) {
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700227 store.removeSinks(route, connectPoints);
Andrea Campanella545edb42018-03-20 16:37:29 -0700228 }
229 }
230
231 @Override
232 public McastRouteData routeData(McastRoute route) {
233 checkNotNull(route, "Route cannot be null");
234 return checkRoute(route) ? store.getRouteData(route) : null;
235 }
236
237 @Override
238 public Set<ConnectPoint> sources(McastRoute route) {
239 checkNotNull(route, "Route cannot be null");
240 return checkRoute(route) ? store.sourcesFor(route) : ImmutableSet.of();
241 }
242
243 @Override
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200244 public Set<ConnectPoint> sources(McastRoute route, HostId hostId) {
245 checkNotNull(route, "Route cannot be null");
246 return checkRoute(route) ? store.sourcesFor(route, hostId) : ImmutableSet.of();
247 }
248
249 @Override
Andrea Campanella545edb42018-03-20 16:37:29 -0700250 public Set<ConnectPoint> sinks(McastRoute route) {
251 checkNotNull(route, "Route cannot be null");
252 return checkRoute(route) ? store.sinksFor(route) : ImmutableSet.of();
253 }
254
255 @Override
256 public Set<ConnectPoint> sinks(McastRoute route, HostId hostId) {
257 checkNotNull(route, "Route cannot be null");
258 return checkRoute(route) ? store.sinksFor(route, hostId) : ImmutableSet.of();
259 }
260
261 @Override
262 public Set<ConnectPoint> nonHostSinks(McastRoute route) {
263 checkNotNull(route, "Route cannot be null");
264 return checkRoute(route) ? store.sinksFor(route, HostId.NONE) : ImmutableSet.of();
265 }
266
267 private class InternalMcastStoreDelegate implements McastStoreDelegate {
268 @Override
269 public void notify(McastEvent event) {
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700270 log.debug("Notify event: {}", event);
Andrea Campanella545edb42018-03-20 16:37:29 -0700271 post(event);
272 }
273 }
274
275 private boolean checkRoute(McastRoute route) {
276 if (store.getRoutes().contains(route)) {
277 return true;
278 } else {
279 log.warn("Route {} is not present in the store, please add it", route);
280 }
281 return false;
282 }
283
284 private class InternalHostListener implements HostListener {
285
286 @Override
287 public void event(HostEvent event) {
Jordan Halterman9aff4ea2018-08-13 02:34:35 -0700288 hostEventExecutor.execute(() -> {
289 HostId hostId = event.subject().id();
290 log.debug("Host event: {}", event);
291 Set<McastRoute> routesForSource = routesForSource(hostId);
292 Set<McastRoute> routesForSink = routesForSink(hostId);
293 switch (event.type()) {
294 case HOST_ADDED:
295 //the host is added, if it already comes with some locations let's use them
296 if (!routesForSource.isEmpty()) {
297 eventAddSources(hostId, event.subject().locations(), routesForSource);
298 }
299 if (!routesForSink.isEmpty()) {
300 eventAddSinks(hostId, event.subject().locations(), routesForSink);
301 }
302 break;
303 case HOST_MOVED:
304 //both subjects must be null or the system is in an incoherent state
305 if ((event.prevSubject() != null && event.subject() != null)) {
306 //we compute the difference between old locations and new ones and remove the previous
307 Set<HostLocation> removedConnectPoint = Sets.difference(event.prevSubject().locations(),
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700308 event.subject().locations()).immutableCopy();
Jordan Halterman9aff4ea2018-08-13 02:34:35 -0700309 if (!removedConnectPoint.isEmpty()) {
310 if (!routesForSource.isEmpty()) {
311 eventRemoveSources(hostId, removedConnectPoint, routesForSource);
312 }
313 if (!routesForSink.isEmpty()) {
314 eventRemoveSinks(hostId, removedConnectPoint, routesForSink);
315 }
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200316 }
Jordan Halterman9aff4ea2018-08-13 02:34:35 -0700317 Set<HostLocation> addedConnectPoints = Sets.difference(event.subject().locations(),
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700318 event.prevSubject().locations()).immutableCopy();
Jordan Halterman9aff4ea2018-08-13 02:34:35 -0700319 //if the host now has some new locations we add them to the sinks set
320 if (!addedConnectPoints.isEmpty()) {
321 if (!routesForSource.isEmpty()) {
322 eventAddSources(hostId, addedConnectPoints, routesForSource);
323 }
324 if (!routesForSink.isEmpty()) {
325 eventAddSinks(hostId, addedConnectPoints, routesForSink);
326 }
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200327 }
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700328 }
Jordan Halterman9aff4ea2018-08-13 02:34:35 -0700329 break;
330 case HOST_REMOVED:
331 // Removing all the connect points for that specific host
332 // even if the locations are 0 we keep
333 // the host information in the route in case it shows up again
334 if (!routesForSource.isEmpty()) {
335 eventRemoveSources(hostId, event.subject().locations(), routesForSource);
336 }
337 if (!routesForSink.isEmpty()) {
338 eventRemoveSinks(hostId, event.subject().locations(), routesForSink);
339 }
340 break;
341 case HOST_UPDATED:
342 default:
343 log.debug("Host event {} not handled", event.type());
344 }
345 });
Andrea Campanella545edb42018-03-20 16:37:29 -0700346 }
347 }
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700348
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200349 //Finds the route for which a host is source
350 private Set<McastRoute> routesForSource(HostId hostId) {
351 // Filter by host id
352 return store.getRoutes().stream().filter(mcastRoute -> store.getRouteData(mcastRoute)
353 .sources().containsKey(hostId)).collect(Collectors.toSet());
354 }
355
356 //Finds the route for which a host is sink
357 private Set<McastRoute> routesForSink(HostId hostId) {
358 return store.getRoutes().stream().filter(mcastRoute -> store.getRouteData(mcastRoute)
359 .sinks().containsKey(hostId)).collect(Collectors.toSet());
360 }
361
362 //Removes sources for a given host event
363 private void eventRemoveSources(HostId hostId, Set<HostLocation> removedSources, Set<McastRoute> routesForSource) {
364 Set<ConnectPoint> sources = new HashSet<>();
365 // Build sink using host location
366 sources.addAll(removedSources);
367 // Remove from each route the provided sinks
368 routesForSource.forEach(route -> store.removeSources(route, hostId, sources));
369 }
370
371 //Adds the sources for a given host event
372 private void eventAddSources(HostId hostId, Set<HostLocation> addedSources, Set<McastRoute> routesForSource) {
373 Set<ConnectPoint> sources = new HashSet<>();
374 // Build source using host location
375 sources.addAll(addedSources);
376 // Add to each route the provided sources
377 routesForSource.forEach(route -> store.storeSource(route, hostId, sources));
378 }
379
380 //Remove sinks for a given host event
381 private void eventRemoveSinks(HostId hostId, Set<HostLocation> removedSinks, Set<McastRoute> routesForSinks) {
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700382 Set<ConnectPoint> sinks = new HashSet<>();
383 // Build sink using host location
384 sinks.addAll(removedSinks);
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200385 // Remove from each route the provided sinks
386 routesForSinks.forEach(route -> store.removeSinks(route, hostId, sinks));
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700387 }
388
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200389 //Adds the sinks for a given host event
390 private void eventAddSinks(HostId hostId, Set<HostLocation> addedSinks, Set<McastRoute> routesForSinks) {
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700391 Set<ConnectPoint> sinks = new HashSet<>();
392 // Build sink using host location
393 sinks.addAll(addedSinks);
Andrea Campanella0ddf9b82018-04-27 15:54:42 +0200394 // Add to each route the provided sinks
395 routesForSinks.forEach(route -> store.addSink(route, hostId, sinks));
Andrea Campanella7c8bcdf2018-03-26 23:29:11 -0700396 }
Andrea Campanella545edb42018-03-20 16:37:29 -0700397}