blob: 0c89f9411e94020df18bc819cca684241588091e [file] [log] [blame]
Jonathan Hartbfc5c482016-04-05 18:57:00 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.routing.impl;
18
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.packet.IpAddress;
26import org.onlab.packet.MacAddress;
27import org.onosproject.event.ListenerService;
28import org.onosproject.incubator.net.routing.ResolvedRoute;
29import org.onosproject.incubator.net.routing.Route;
30import org.onosproject.incubator.net.routing.RouteAdminService;
31import org.onosproject.incubator.net.routing.RouteEvent;
32import org.onosproject.incubator.net.routing.RouteListener;
33import org.onosproject.incubator.net.routing.RouteService;
34import org.onosproject.incubator.net.routing.RouteStore;
35import org.onosproject.incubator.net.routing.RouteStoreDelegate;
36import org.onosproject.incubator.net.routing.RouteTableId;
37import org.onosproject.net.Host;
38import org.onosproject.net.host.HostEvent;
39import org.onosproject.net.host.HostListener;
40import org.onosproject.net.host.HostService;
41import org.slf4j.Logger;
42import org.slf4j.LoggerFactory;
43
44import javax.annotation.concurrent.GuardedBy;
45import java.util.Collection;
46import java.util.Collections;
47import java.util.HashMap;
48import java.util.Map;
49import java.util.Optional;
50import java.util.Set;
51import java.util.concurrent.BlockingQueue;
52import java.util.concurrent.ExecutorService;
53import java.util.concurrent.LinkedBlockingQueue;
54import java.util.concurrent.ThreadFactory;
55import java.util.function.Function;
56import java.util.stream.Collectors;
57
58import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60
61/**
62 * Implementation of the unicast route service.
63 */
64@Service
65@Component
66public class RouteManager implements ListenerService<RouteEvent, RouteListener>,
67 RouteService, RouteAdminService {
68
69 private final Logger log = LoggerFactory.getLogger(getClass());
70
71 private RouteStoreDelegate delegate = new InternalRouteStoreDelegate();
72 private InternalHostListener hostListener = new InternalHostListener();
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected RouteStore routeStore;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected HostService hostService;
79
80 @GuardedBy(value = "this")
81 private Map<RouteListener, ListenerQueue> listeners = new HashMap<>();
82
83 private ThreadFactory threadFactory;
84
85 @Activate
86 protected void activate() {
87 threadFactory = groupedThreads("onos/route", "listener-%d");
88
89 routeStore.setDelegate(delegate);
90 hostService.addListener(hostListener);
91
92 }
93
94 @Deactivate
95 protected void deactivate() {
96 listeners.values().forEach(l -> l.stop());
97
98 routeStore.unsetDelegate(delegate);
99 hostService.removeListener(hostListener);
100 }
101
102 /**
103 * {@inheritDoc}
104 *
105 * In a departure from other services in ONOS, calling addListener will
106 * cause all current routes to be pushed to the listener before any new
107 * events are sent. This allows a listener to easily get the exact set of
108 * routes without worrying about missing any.
109 *
110 * @param listener listener to be added
111 */
112 @Override
113 public void addListener(RouteListener listener) {
114 synchronized (this) {
115 log.debug("Synchronizing current routes to new listener");
116 ListenerQueue l = new ListenerQueue(listener);
117 routeStore.getRouteTables().forEach(table -> {
118 Collection<Route> routes = routeStore.getRoutes(table);
119 if (routes != null) {
120 routes.forEach(route ->
121 l.post(new RouteEvent(RouteEvent.Type.ROUTE_UPDATED,
122 new ResolvedRoute(route, routeStore.getNextHop(route.nextHop())))));
123 }
124 });
125
126 listeners.put(listener, l);
127
128 l.start();
129 log.debug("Route synchronization complete");
130 }
131 }
132
133 @Override
134 public void removeListener(RouteListener listener) {
135 synchronized (this) {
136 ListenerQueue l = listeners.remove(listener);
137 if (l != null) {
138 l.stop();
139 }
140 }
141 }
142
143 /**
144 * Posts an event to all listeners.
145 *
146 * @param event event
147 */
148 private void post(RouteEvent event) {
149 synchronized (this) {
150 listeners.values().forEach(l -> l.post(event));
151 }
152 }
153
154 @Override
155 public Map<RouteTableId, Collection<Route>> getAllRoutes() {
156 return routeStore.getRouteTables().stream()
157 .collect(Collectors.toMap(Function.identity(),
158 table -> (table == null) ?
159 Collections.emptySet() : routeStore.getRoutes(table)));
160 }
161
162 @Override
163 public Route longestPrefixMatch(IpAddress ip) {
164 return routeStore.longestPrefixMatch(ip);
165 }
166
167 @Override
168 public void update(Collection<Route> routes) {
169 synchronized (this) {
170 routes.forEach(route -> {
171 routeStore.updateRoute(route);
172 resolve(route);
173 });
174 }
175 }
176
177 @Override
178 public void withdraw(Collection<Route> routes) {
179 synchronized (this) {
180 routes.forEach(route -> routeStore.removeRoute(route));
181 }
182 }
183
184 private void resolve(Route route) {
185 // Monitor the IP address for updates of the MAC address
186 hostService.startMonitoringIp(route.nextHop());
187
188 MacAddress nextHopMac = routeStore.getNextHop(route.nextHop());
189 if (nextHopMac == null) {
190 Set<Host> hosts = hostService.getHostsByIp(route.nextHop());
191 Optional<Host> host = hosts.stream().findFirst();
192 if (host.isPresent()) {
193 nextHopMac = host.get().mac();
194 }
195 }
196
197 if (nextHopMac != null) {
198 routeStore.updateNextHop(route.nextHop(), nextHopMac);
199 }
200 }
201
202 private void hostUpdated(Host host) {
203 synchronized (this) {
204 for (IpAddress ip : host.ipAddresses()) {
205 routeStore.updateNextHop(ip, host.mac());
206 }
207 }
208 }
209
210 private void hostRemoved(Host host) {
211 synchronized (this) {
212 for (IpAddress ip : host.ipAddresses()) {
213 routeStore.removeNextHop(ip, host.mac());
214 }
215 }
216 }
217
218 /**
219 * Queues updates for a route listener to ensure they are received in the
220 * correct order.
221 */
222 private class ListenerQueue {
223
224 private final ExecutorService executorService;
225 private final BlockingQueue<RouteEvent> queue;
226 private final RouteListener listener;
227
228 /**
229 * Creates a new listener queue.
230 *
231 * @param listener route listener to queue updates for
232 */
233 public ListenerQueue(RouteListener listener) {
234 this.listener = listener;
235 queue = new LinkedBlockingQueue<>();
236 executorService = newSingleThreadExecutor(threadFactory);
237 }
238
239 /**
240 * Posts and event to the listener.
241 *
242 * @param event event
243 */
244 public void post(RouteEvent event) {
245 queue.add(event);
246 }
247
248 /**
249 * Initiates event delivery to the listener.
250 */
251 public void start() {
252 executorService.execute(this::poll);
253 }
254
255 /**
256 * Halts event delivery to the listener.
257 */
258 public void stop() {
259 executorService.shutdown();
260 }
261
262 private void poll() {
263 try {
264 while (true) {
265 listener.event(queue.take());
266 }
267 } catch (InterruptedException e) {
268 log.info("Route listener event thread shutting down: {}", e.getMessage());
269 }
270 }
271
272 }
273
274 /**
275 * Delegate to receive events from the route store.
276 */
277 private class InternalRouteStoreDelegate implements RouteStoreDelegate {
278 @Override
279 public void notify(RouteEvent event) {
280 post(event);
281 }
282 }
283
284 /**
285 * Internal listener for host events.
286 */
287 private class InternalHostListener implements HostListener {
288 @Override
289 public void event(HostEvent event) {
290 switch (event.type()) {
291 case HOST_ADDED:
292 case HOST_UPDATED:
293 hostUpdated(event.subject());
294 break;
295 case HOST_REMOVED:
296 hostRemoved(event.subject());
297 break;
298 case HOST_MOVED:
299 break;
300 default:
301 break;
302 }
303 }
304 }
305
306}