blob: 3c9f22392f75694881d3804daf7cf5e5a911025a [file] [log] [blame]
Mohammad Shahidaa7c1232017-08-09 11:13:15 +05301/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Milkeya95193c2017-08-10 15:35:36 -070017package org.onosproject.evpnrouteservice.impl;
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053018
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053019import java.util.Collection;
20import java.util.HashMap;
21import java.util.Map;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.LinkedBlockingQueue;
25import java.util.concurrent.ThreadFactory;
26
Ray Milkeya95193c2017-08-10 15:35:36 -070027import javax.annotation.concurrent.GuardedBy;
28
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Deactivate;
32import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
35
36import org.onosproject.evpnrouteservice.EvpnInternalRouteEvent;
37import org.onosproject.evpnrouteservice.EvpnRoute;
38import org.onosproject.evpnrouteservice.EvpnRouteAdminService;
39import org.onosproject.evpnrouteservice.EvpnRouteEvent;
40import org.onosproject.evpnrouteservice.EvpnRouteListener;
41import org.onosproject.evpnrouteservice.EvpnRouteService;
42import org.onosproject.evpnrouteservice.EvpnRouteSet;
43import org.onosproject.evpnrouteservice.EvpnRouteStore;
44import org.onosproject.evpnrouteservice.EvpnRouteStoreDelegate;
45import org.onosproject.evpnrouteservice.EvpnRouteTableId;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053049import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51
52/**
53 * Implementation of the EVPN route service.
54 */
55@Service
56@Component
57public class EvpnRouteManager implements EvpnRouteService,
58 EvpnRouteAdminService {
59
60 private final Logger log = LoggerFactory.getLogger(getClass());
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected EvpnRouteStore evpnRouteStore;
64
65 @GuardedBy(value = "this")
66 private Map<EvpnRouteListener, EvpnListenerQueue> listeners = new
67 HashMap<>();
68
69 private ThreadFactory threadFactory;
70
71 private EvpnRouteStoreDelegate evpnRouteStoreDelegate = new
72 InternalEvpnRouteStoreDelegate();
73
74 @Activate
75 protected void activate() {
76 threadFactory = groupedThreads("onos/route", "listener-%d", log);
77 evpnRouteStore.setDelegate(evpnRouteStoreDelegate);
78
79 }
80
81 @Deactivate
82 protected void deactivate() {
83 evpnRouteStore.unsetDelegate(evpnRouteStoreDelegate);
Yuta HIGUCHI488a94c2018-01-26 17:24:09 -080084 synchronized (this) {
85 listeners.values().forEach(EvpnListenerQueue::stop);
86 }
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053087 }
88
89 /**
90 * {@inheritDoc}
91 * <p>
92 * In a departure from other services in ONOS, calling addListener will
93 * cause all current routes to be pushed to the listener before any new
94 * events are sent. This allows a listener to easily get the exact set of
95 * routes without worrying about missing any.
96 *
97 * @param listener listener to be added
98 */
99 @Override
100 public void addListener(EvpnRouteListener listener) {
101 synchronized (this) {
102 EvpnListenerQueue l = createListenerQueue(listener);
103
104 evpnRouteStore.getRouteTables().forEach(routeTableId
105 -> {
106 Collection<EvpnRouteSet> routes
107 = evpnRouteStore.getRoutes(routeTableId);
108 if (routes != null) {
109 routes.forEach(route -> {
110 Collection<EvpnRoute> evpnRoutes = route.routes();
111 for (EvpnRoute evpnRoute : evpnRoutes) {
112 l.post(new EvpnRouteEvent(
113 EvpnRouteEvent.Type.ROUTE_ADDED,
114 evpnRoute,
115 route.routes()));
116 }
117 });
118 }
119 });
120 listeners.put(listener, l);
121
122 l.start();
123 log.debug("Route synchronization complete");
124 }
125 }
126
127 @Override
128 public void removeListener(EvpnRouteListener listener) {
129 synchronized (this) {
130 EvpnListenerQueue l = listeners.remove(listener);
131 if (l != null) {
132 l.stop();
133 }
134 }
135 }
136
137 /**
138 * Posts an event to all listeners.
139 *
140 * @param event event
141 */
142
143 private void post(EvpnRouteEvent event) {
144 if (event != null) {
145 log.debug("Sending event {}", event);
146 synchronized (this) {
147 listeners.values().forEach(l -> l.post(event));
148 }
149 }
150 }
151
152
Yuta HIGUCHI488a94c2018-01-26 17:24:09 -0800153 @Override
Jonathan Harte9c0c6e2017-08-09 16:44:13 -0700154 public Collection<EvpnRouteTableId> getRouteTables() {
Mohammad Shahidaa7c1232017-08-09 11:13:15 +0530155 return evpnRouteStore.getRouteTables();
156 }
157
158 @Override
159 public void update(Collection<EvpnRoute> routes) {
160 synchronized (this) {
161 routes.forEach(route -> {
162 log.debug("Received update {}", route);
163 evpnRouteStore.updateRoute(route);
164 });
165 }
166 }
167
168 @Override
169 public void withdraw(Collection<EvpnRoute> routes) {
170 synchronized (this) {
171 routes.forEach(route -> {
172 log.debug("Received withdraw {}", route);
173 evpnRouteStore.removeRoute(route);
174 });
175 }
176 }
177
178 /**
179 * Creates a new listener queue.
180 *
181 * @param listener route listener
182 * @return listener queue
183 */
184 DefaultListenerQueue createListenerQueue(EvpnRouteListener listener) {
185 return new DefaultListenerQueue(listener);
186 }
187
188 /**
189 * Default route listener queue.
190 */
191 private class DefaultListenerQueue implements EvpnListenerQueue {
192
193 private final ExecutorService executorService;
194 private final BlockingQueue<EvpnRouteEvent> queue;
195 private final EvpnRouteListener listener;
196
197 /**
198 * Creates a new listener queue.
199 *
200 * @param listener route listener to queue updates for
201 */
202 public DefaultListenerQueue(EvpnRouteListener listener) {
203 this.listener = listener;
204 queue = new LinkedBlockingQueue<>();
205 executorService = newSingleThreadExecutor(threadFactory);
206 }
207
208 @Override
209 public void post(EvpnRouteEvent event) {
210 queue.add(event);
211 }
212
213 @Override
214 public void start() {
215 executorService.execute(this::poll);
216 }
217
218 @Override
219 public void stop() {
220 executorService.shutdown();
221 }
222
223 private void poll() {
224 while (true) {
225 try {
226 listener.event(queue.take());
227 } catch (InterruptedException e) {
228 log.info("Route listener event thread shutting down: {}", e.getMessage());
Ray Milkey5c7d4882018-02-05 14:50:39 -0800229 Thread.currentThread().interrupt();
Mohammad Shahidaa7c1232017-08-09 11:13:15 +0530230 break;
231 } catch (Exception e) {
232 log.warn("Exception during route event handler", e);
233 }
234 }
235 }
236 }
237
238 /**
239 * Delegate to receive events from the route store.
240 */
241 private class InternalEvpnRouteStoreDelegate implements
242 EvpnRouteStoreDelegate {
243 EvpnRouteSet routes;
244
245 @Override
246 public void notify(EvpnInternalRouteEvent event) {
247 switch (event.type()) {
248 case ROUTE_ADDED:
249 routes = event.subject();
250 if (routes != null) {
251 Collection<EvpnRoute> evpnRoutes = routes.routes();
252 for (EvpnRoute evpnRoute : evpnRoutes) {
253 post(new EvpnRouteEvent(
254 EvpnRouteEvent.Type.ROUTE_ADDED,
255 evpnRoute,
256 routes.routes()));
257 }
258 }
259 break;
260 case ROUTE_REMOVED:
261 routes = event.subject();
262 if (routes != null) {
263 Collection<EvpnRoute> evpnRoutes = routes.routes();
264 for (EvpnRoute evpnRoute : evpnRoutes) {
265 post(new EvpnRouteEvent(
266 EvpnRouteEvent.Type.ROUTE_REMOVED,
267 evpnRoute,
268 routes.routes()));
269 }
270 }
271 break;
272 default:
273 break;
274 }
275 }
276 }
277
278}