blob: 004a550123555cc9335c926e6f07945a275157ed [file] [log] [blame]
Jonathan Hartbfc5c482016-04-05 18:57:00 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jonathan Hartbfc5c482016-04-05 18:57:00 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.routing.impl;
18
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.packet.IpAddress;
26import org.onlab.packet.MacAddress;
27import org.onosproject.event.ListenerService;
Jonathan Hartfd176612016-04-11 10:42:10 -070028import org.onosproject.incubator.net.routing.NextHop;
Jonathan Hartbfc5c482016-04-05 18:57:00 -070029import org.onosproject.incubator.net.routing.ResolvedRoute;
30import org.onosproject.incubator.net.routing.Route;
31import org.onosproject.incubator.net.routing.RouteAdminService;
32import org.onosproject.incubator.net.routing.RouteEvent;
33import org.onosproject.incubator.net.routing.RouteListener;
34import org.onosproject.incubator.net.routing.RouteService;
35import org.onosproject.incubator.net.routing.RouteStore;
36import org.onosproject.incubator.net.routing.RouteStoreDelegate;
37import org.onosproject.incubator.net.routing.RouteTableId;
38import org.onosproject.net.Host;
39import org.onosproject.net.host.HostEvent;
40import org.onosproject.net.host.HostListener;
41import org.onosproject.net.host.HostService;
42import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
45import javax.annotation.concurrent.GuardedBy;
46import java.util.Collection;
47import java.util.Collections;
48import java.util.HashMap;
49import java.util.Map;
50import java.util.Optional;
51import java.util.Set;
52import java.util.concurrent.BlockingQueue;
53import java.util.concurrent.ExecutorService;
54import java.util.concurrent.LinkedBlockingQueue;
55import java.util.concurrent.ThreadFactory;
56import java.util.function.Function;
57import java.util.stream.Collectors;
58
59import static java.util.concurrent.Executors.newSingleThreadExecutor;
60import static org.onlab.util.Tools.groupedThreads;
61
62/**
63 * Implementation of the unicast route service.
64 */
65@Service
66@Component
67public class RouteManager implements ListenerService<RouteEvent, RouteListener>,
68 RouteService, RouteAdminService {
69
70 private final Logger log = LoggerFactory.getLogger(getClass());
71
72 private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
73 private InternalHostListener hostListener = new InternalHostListener();
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected RouteStore routeStore;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected HostService hostService;
80
81 @GuardedBy(value = "this")
82 private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
83
84 private ThreadFactory threadFactory;
85
86 @Activate
87 protected void activate() {
88 threadFactory = groupedThreads("onos/route", "listener-%d");
89
90 routeStore.setDelegate(delegate);
91 hostService.addListener(hostListener);
92
93 }
94
95 @Deactivate
96 protected void deactivate() {
97 listeners.values().forEach(l -> l.stop());
98
99 routeStore.unsetDelegate(delegate);
100 hostService.removeListener(hostListener);
101 }
102
103 /**
104 * {@inheritDoc}
105 *
106 * In a departure from other services in ONOS, calling addListener will
107 * cause all current routes to be pushed to the listener before any new
108 * events are sent. This allows a listener to easily get the exact set of
109 * routes without worrying about missing any.
110 *
111 * @param listener listener to be added
112 */
113 @Override
114 public void addListener(RouteListener listener) {
115 synchronized (this) {
116 log.debug("Synchronizing current routes to new listener");
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700117 ListenerQueue l = createListenerQueue(listener);
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700118 routeStore.getRouteTables().forEach(table -> {
119 Collection<Route> routes = routeStore.getRoutes(table);
120 if (routes != null) {
121 routes.forEach(route ->
122 l.post(new RouteEvent(RouteEvent.Type.ROUTE_UPDATED,
123 new ResolvedRoute(route, routeStore.getNextHop(route.nextHop())))));
124 }
125 });
126
127 listeners.put(listener, l);
128
129 l.start();
130 log.debug("Route synchronization complete");
131 }
132 }
133
134 @Override
135 public void removeListener(RouteListener listener) {
136 synchronized (this) {
137 ListenerQueue l = listeners.remove(listener);
138 if (l != null) {
139 l.stop();
140 }
141 }
142 }
143
144 /**
145 * Posts an event to all listeners.
146 *
147 * @param event event
148 */
149 private void post(RouteEvent event) {
Jonathan Hartfd176612016-04-11 10:42:10 -0700150 log.debug("Sending event {}", event);
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700151 synchronized (this) {
152 listeners.values().forEach(l -> l.post(event));
153 }
154 }
155
156 @Override
157 public Map<RouteTableId, Collection<Route>> getAllRoutes() {
158 return routeStore.getRouteTables().stream()
159 .collect(Collectors.toMap(Function.identity(),
160 table -> (table == null) ?
161 Collections.emptySet() : routeStore.getRoutes(table)));
162 }
163
164 @Override
165 public Route longestPrefixMatch(IpAddress ip) {
166 return routeStore.longestPrefixMatch(ip);
167 }
168
169 @Override
Jonathan Hartfd176612016-04-11 10:42:10 -0700170 public Collection<Route> getRoutesForNextHop(IpAddress nextHop) {
171 return routeStore.getRoutesForNextHop(nextHop);
172 }
173
174 @Override
175 public Set<NextHop> getNextHops() {
176 return routeStore.getNextHops().entrySet().stream()
177 .map(entry -> new NextHop(entry.getKey(), entry.getValue()))
178 .collect(Collectors.toSet());
179 }
180
181 @Override
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700182 public void update(Collection<Route> routes) {
183 synchronized (this) {
184 routes.forEach(route -> {
Jonathan Hartfd176612016-04-11 10:42:10 -0700185 log.debug("Received update {}", route);
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700186 routeStore.updateRoute(route);
187 resolve(route);
188 });
189 }
190 }
191
192 @Override
193 public void withdraw(Collection<Route> routes) {
194 synchronized (this) {
Jonathan Hartfd176612016-04-11 10:42:10 -0700195 routes.forEach(route -> {
196 log.debug("Received withdraw {}", routes);
197 routeStore.removeRoute(route);
198 });
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700199 }
200 }
201
202 private void resolve(Route route) {
203 // Monitor the IP address for updates of the MAC address
204 hostService.startMonitoringIp(route.nextHop());
205
206 MacAddress nextHopMac = routeStore.getNextHop(route.nextHop());
207 if (nextHopMac == null) {
208 Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
209 Optional<Host> host = hosts.stream().findFirst();
210 if (host.isPresent()) {
211 nextHopMac = host.get().mac();
212 }
213 }
214
215 if (nextHopMac != null) {
216 routeStore.updateNextHop(route.nextHop(), nextHopMac);
217 }
218 }
219
220 private void hostUpdated(Host host) {
221 synchronized (this) {
222 for (IpAddress ip : host.ipAddresses()) {
223 routeStore.updateNextHop(ip, host.mac());
224 }
225 }
226 }
227
228 private void hostRemoved(Host host) {
229 synchronized (this) {
230 for (IpAddress ip : host.ipAddresses()) {
231 routeStore.removeNextHop(ip, host.mac());
232 }
233 }
234 }
235
236 /**
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700237 * Creates a new listener queue.
238 *
239 * @param listener route listener
240 * @return listener queue
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700241 */
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700242 ListenerQueue createListenerQueue(RouteListener listener) {
243 return new DefaultListenerQueue(listener);
244 }
245
246 /**
247 * Default route listener queue.
248 */
249 private class DefaultListenerQueue implements ListenerQueue {
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700250
251 private final ExecutorService executorService;
252 private final BlockingQueue<RouteEvent> queue;
253 private final RouteListener listener;
254
255 /**
256 * Creates a new listener queue.
257 *
258 * @param listener route listener to queue updates for
259 */
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700260 public DefaultListenerQueue(RouteListener listener) {
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700261 this.listener = listener;
262 queue = new LinkedBlockingQueue<>();
263 executorService = newSingleThreadExecutor(threadFactory);
264 }
265
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700266 @Override
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700267 public void post(RouteEvent event) {
268 queue.add(event);
269 }
270
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700271 @Override
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700272 public void start() {
273 executorService.execute(this::poll);
274 }
275
Jonathan Hart6c2e7962016-04-11 13:54:09 -0700276 @Override
Jonathan Hartbfc5c482016-04-05 18:57:00 -0700277 public void stop() {
278 executorService.shutdown();
279 }
280
281 private void poll() {
282 try {
283 while (true) {
284 listener.event(queue.take());
285 }
286 } catch (InterruptedException e) {
287 log.info("Route listener event thread shutting down: {}", e.getMessage());
288 }
289 }
290
291 }
292
293 /**
294 * Delegate to receive events from the route store.
295 */
296 private class InternalRouteStoreDelegate implements RouteStoreDelegate {
297 @Override
298 public void notify(RouteEvent event) {
299 post(event);
300 }
301 }
302
303 /**
304 * Internal listener for host events.
305 */
306 private class InternalHostListener implements HostListener {
307 @Override
308 public void event(HostEvent event) {
309 switch (event.type()) {
310 case HOST_ADDED:
311 case HOST_UPDATED:
312 hostUpdated(event.subject());
313 break;
314 case HOST_REMOVED:
315 hostRemoved(event.subject());
316 break;
317 case HOST_MOVED:
318 break;
319 default:
320 break;
321 }
322 }
323 }
324
325}