blob: 7ea48133ff4695e291f1ff23d488492bad9814ee [file] [log] [blame]
Mohammad Shahidaa7c1232017-08-09 11:13:15 +05301/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.net.routing.impl;
18
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.incubator.net.routing.EvpnInternalRouteEvent;
26import org.onosproject.incubator.net.routing.EvpnRoute;
27import org.onosproject.incubator.net.routing.EvpnRouteAdminService;
28import org.onosproject.incubator.net.routing.EvpnRouteEvent;
29import org.onosproject.incubator.net.routing.EvpnRouteListener;
30import org.onosproject.incubator.net.routing.EvpnRouteService;
31import org.onosproject.incubator.net.routing.EvpnRouteSet;
32import org.onosproject.incubator.net.routing.EvpnRouteStore;
33import org.onosproject.incubator.net.routing.EvpnRouteStoreDelegate;
34import org.onosproject.incubator.net.routing.RouteTableId;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
38import javax.annotation.concurrent.GuardedBy;
39import java.util.Collection;
40import java.util.HashMap;
41import java.util.Map;
42import java.util.concurrent.BlockingQueue;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.LinkedBlockingQueue;
45import java.util.concurrent.ThreadFactory;
46
47import static java.util.concurrent.Executors.newSingleThreadExecutor;
48import static org.onlab.util.Tools.groupedThreads;
49
50/**
51 * Implementation of the EVPN route service.
52 */
53@Service
54@Component
55public class EvpnRouteManager implements EvpnRouteService,
56 EvpnRouteAdminService {
57
58 private final Logger log = LoggerFactory.getLogger(getClass());
59
60 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 protected EvpnRouteStore evpnRouteStore;
62
63 @GuardedBy(value = "this")
64 private Map<EvpnRouteListener, EvpnListenerQueue> listeners = new
65 HashMap<>();
66
67 private ThreadFactory threadFactory;
68
69 private EvpnRouteStoreDelegate evpnRouteStoreDelegate = new
70 InternalEvpnRouteStoreDelegate();
71
72 @Activate
73 protected void activate() {
74 threadFactory = groupedThreads("onos/route", "listener-%d", log);
75 evpnRouteStore.setDelegate(evpnRouteStoreDelegate);
76
77 }
78
79 @Deactivate
80 protected void deactivate() {
81 evpnRouteStore.unsetDelegate(evpnRouteStoreDelegate);
82 listeners.values().forEach(EvpnListenerQueue::stop);
83 }
84
85 /**
86 * {@inheritDoc}
87 * <p>
88 * In a departure from other services in ONOS, calling addListener will
89 * cause all current routes to be pushed to the listener before any new
90 * events are sent. This allows a listener to easily get the exact set of
91 * routes without worrying about missing any.
92 *
93 * @param listener listener to be added
94 */
95 @Override
96 public void addListener(EvpnRouteListener listener) {
97 synchronized (this) {
98 EvpnListenerQueue l = createListenerQueue(listener);
99
100 evpnRouteStore.getRouteTables().forEach(routeTableId
101 -> {
102 Collection<EvpnRouteSet> routes
103 = evpnRouteStore.getRoutes(routeTableId);
104 if (routes != null) {
105 routes.forEach(route -> {
106 Collection<EvpnRoute> evpnRoutes = route.routes();
107 for (EvpnRoute evpnRoute : evpnRoutes) {
108 l.post(new EvpnRouteEvent(
109 EvpnRouteEvent.Type.ROUTE_ADDED,
110 evpnRoute,
111 route.routes()));
112 }
113 });
114 }
115 });
116 listeners.put(listener, l);
117
118 l.start();
119 log.debug("Route synchronization complete");
120 }
121 }
122
123 @Override
124 public void removeListener(EvpnRouteListener listener) {
125 synchronized (this) {
126 EvpnListenerQueue l = listeners.remove(listener);
127 if (l != null) {
128 l.stop();
129 }
130 }
131 }
132
133 /**
134 * Posts an event to all listeners.
135 *
136 * @param event event
137 */
138
139 private void post(EvpnRouteEvent event) {
140 if (event != null) {
141 log.debug("Sending event {}", event);
142 synchronized (this) {
143 listeners.values().forEach(l -> l.post(event));
144 }
145 }
146 }
147
148
149 public Collection<RouteTableId> getRouteTables() {
150 return evpnRouteStore.getRouteTables();
151 }
152
153 @Override
154 public void update(Collection<EvpnRoute> routes) {
155 synchronized (this) {
156 routes.forEach(route -> {
157 log.debug("Received update {}", route);
158 evpnRouteStore.updateRoute(route);
159 });
160 }
161 }
162
163 @Override
164 public void withdraw(Collection<EvpnRoute> routes) {
165 synchronized (this) {
166 routes.forEach(route -> {
167 log.debug("Received withdraw {}", route);
168 evpnRouteStore.removeRoute(route);
169 });
170 }
171 }
172
173 /**
174 * Creates a new listener queue.
175 *
176 * @param listener route listener
177 * @return listener queue
178 */
179 DefaultListenerQueue createListenerQueue(EvpnRouteListener listener) {
180 return new DefaultListenerQueue(listener);
181 }
182
183 /**
184 * Default route listener queue.
185 */
186 private class DefaultListenerQueue implements EvpnListenerQueue {
187
188 private final ExecutorService executorService;
189 private final BlockingQueue<EvpnRouteEvent> queue;
190 private final EvpnRouteListener listener;
191
192 /**
193 * Creates a new listener queue.
194 *
195 * @param listener route listener to queue updates for
196 */
197 public DefaultListenerQueue(EvpnRouteListener listener) {
198 this.listener = listener;
199 queue = new LinkedBlockingQueue<>();
200 executorService = newSingleThreadExecutor(threadFactory);
201 }
202
203 @Override
204 public void post(EvpnRouteEvent event) {
205 queue.add(event);
206 }
207
208 @Override
209 public void start() {
210 executorService.execute(this::poll);
211 }
212
213 @Override
214 public void stop() {
215 executorService.shutdown();
216 }
217
218 private void poll() {
219 while (true) {
220 try {
221 listener.event(queue.take());
222 } catch (InterruptedException e) {
223 log.info("Route listener event thread shutting down: {}", e.getMessage());
224 break;
225 } catch (Exception e) {
226 log.warn("Exception during route event handler", e);
227 }
228 }
229 }
230 }
231
232 /**
233 * Delegate to receive events from the route store.
234 */
235 private class InternalEvpnRouteStoreDelegate implements
236 EvpnRouteStoreDelegate {
237 EvpnRouteSet routes;
238
239 @Override
240 public void notify(EvpnInternalRouteEvent event) {
241 switch (event.type()) {
242 case ROUTE_ADDED:
243 routes = event.subject();
244 if (routes != null) {
245 Collection<EvpnRoute> evpnRoutes = routes.routes();
246 for (EvpnRoute evpnRoute : evpnRoutes) {
247 post(new EvpnRouteEvent(
248 EvpnRouteEvent.Type.ROUTE_ADDED,
249 evpnRoute,
250 routes.routes()));
251 }
252 }
253 break;
254 case ROUTE_REMOVED:
255 routes = event.subject();
256 if (routes != null) {
257 Collection<EvpnRoute> evpnRoutes = routes.routes();
258 for (EvpnRoute evpnRoute : evpnRoutes) {
259 post(new EvpnRouteEvent(
260 EvpnRouteEvent.Type.ROUTE_REMOVED,
261 evpnRoute,
262 routes.routes()));
263 }
264 }
265 break;
266 default:
267 break;
268 }
269 }
270 }
271
272}