blob: f13b89a14b61333ef63cb602c43e0adc552ed5dc [file] [log] [blame]
Mohammad Shahidaa7c1232017-08-09 11:13:15 +05301/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Milkeya95193c2017-08-10 15:35:36 -070017package org.onosproject.evpnrouteservice.impl;
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053018
Ray Milkeya95193c2017-08-10 15:35:36 -070019import org.onosproject.evpnrouteservice.EvpnInternalRouteEvent;
20import org.onosproject.evpnrouteservice.EvpnRoute;
21import org.onosproject.evpnrouteservice.EvpnRouteAdminService;
22import org.onosproject.evpnrouteservice.EvpnRouteEvent;
23import org.onosproject.evpnrouteservice.EvpnRouteListener;
24import org.onosproject.evpnrouteservice.EvpnRouteService;
25import org.onosproject.evpnrouteservice.EvpnRouteSet;
26import org.onosproject.evpnrouteservice.EvpnRouteStore;
27import org.onosproject.evpnrouteservice.EvpnRouteStoreDelegate;
28import org.onosproject.evpnrouteservice.EvpnRouteTableId;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.osgi.service.component.annotations.Activate;
30import org.osgi.service.component.annotations.Component;
31import org.osgi.service.component.annotations.Deactivate;
32import org.osgi.service.component.annotations.Reference;
33import org.osgi.service.component.annotations.ReferenceCardinality;
Ray Milkeya95193c2017-08-10 15:35:36 -070034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
Ray Milkeyd84f89b2018-08-17 14:54:17 -070037import javax.annotation.concurrent.GuardedBy;
38import java.util.Collection;
39import java.util.HashMap;
40import java.util.Map;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ExecutorService;
43import java.util.concurrent.LinkedBlockingQueue;
44import java.util.concurrent.ThreadFactory;
45
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053046import static java.util.concurrent.Executors.newSingleThreadExecutor;
47import static org.onlab.util.Tools.groupedThreads;
48
49/**
50 * Implementation of the EVPN route service.
51 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070052@Component(service = { EvpnRouteService.class, EvpnRouteAdminService.class })
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053053public class EvpnRouteManager implements EvpnRouteService,
54 EvpnRouteAdminService {
55
56 private final Logger log = LoggerFactory.getLogger(getClass());
57
Ray Milkeyd84f89b2018-08-17 14:54:17 -070058 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053059 protected EvpnRouteStore evpnRouteStore;
60
61 @GuardedBy(value = "this")
62 private Map<EvpnRouteListener, EvpnListenerQueue> listeners = new
63 HashMap<>();
64
65 private ThreadFactory threadFactory;
66
67 private EvpnRouteStoreDelegate evpnRouteStoreDelegate = new
68 InternalEvpnRouteStoreDelegate();
69
70 @Activate
71 protected void activate() {
72 threadFactory = groupedThreads("onos/route", "listener-%d", log);
73 evpnRouteStore.setDelegate(evpnRouteStoreDelegate);
74
75 }
76
77 @Deactivate
78 protected void deactivate() {
79 evpnRouteStore.unsetDelegate(evpnRouteStoreDelegate);
Yuta HIGUCHI488a94c2018-01-26 17:24:09 -080080 synchronized (this) {
81 listeners.values().forEach(EvpnListenerQueue::stop);
82 }
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053083 }
84
85 /**
86 * {@inheritDoc}
87 * <p>
88 * In a departure from other services in ONOS, calling addListener will
89 * cause all current routes to be pushed to the listener before any new
90 * events are sent. This allows a listener to easily get the exact set of
91 * routes without worrying about missing any.
92 *
93 * @param listener listener to be added
94 */
95 @Override
96 public void addListener(EvpnRouteListener listener) {
97 synchronized (this) {
98 EvpnListenerQueue l = createListenerQueue(listener);
99
100 evpnRouteStore.getRouteTables().forEach(routeTableId
101 -> {
102 Collection<EvpnRouteSet> routes
103 = evpnRouteStore.getRoutes(routeTableId);
104 if (routes != null) {
105 routes.forEach(route -> {
106 Collection<EvpnRoute> evpnRoutes = route.routes();
107 for (EvpnRoute evpnRoute : evpnRoutes) {
108 l.post(new EvpnRouteEvent(
109 EvpnRouteEvent.Type.ROUTE_ADDED,
110 evpnRoute,
111 route.routes()));
112 }
113 });
114 }
115 });
116 listeners.put(listener, l);
117
118 l.start();
119 log.debug("Route synchronization complete");
120 }
121 }
122
123 @Override
124 public void removeListener(EvpnRouteListener listener) {
125 synchronized (this) {
126 EvpnListenerQueue l = listeners.remove(listener);
127 if (l != null) {
128 l.stop();
129 }
130 }
131 }
132
133 /**
134 * Posts an event to all listeners.
135 *
136 * @param event event
137 */
138
139 private void post(EvpnRouteEvent event) {
140 if (event != null) {
141 log.debug("Sending event {}", event);
142 synchronized (this) {
143 listeners.values().forEach(l -> l.post(event));
144 }
145 }
146 }
147
148
Yuta HIGUCHI488a94c2018-01-26 17:24:09 -0800149 @Override
Jonathan Harte9c0c6e2017-08-09 16:44:13 -0700150 public Collection<EvpnRouteTableId> getRouteTables() {
Mohammad Shahidaa7c1232017-08-09 11:13:15 +0530151 return evpnRouteStore.getRouteTables();
152 }
153
154 @Override
155 public void update(Collection<EvpnRoute> routes) {
156 synchronized (this) {
157 routes.forEach(route -> {
158 log.debug("Received update {}", route);
159 evpnRouteStore.updateRoute(route);
160 });
161 }
162 }
163
164 @Override
165 public void withdraw(Collection<EvpnRoute> routes) {
166 synchronized (this) {
167 routes.forEach(route -> {
168 log.debug("Received withdraw {}", route);
169 evpnRouteStore.removeRoute(route);
170 });
171 }
172 }
173
174 /**
175 * Creates a new listener queue.
176 *
177 * @param listener route listener
178 * @return listener queue
179 */
180 DefaultListenerQueue createListenerQueue(EvpnRouteListener listener) {
181 return new DefaultListenerQueue(listener);
182 }
183
184 /**
185 * Default route listener queue.
186 */
187 private class DefaultListenerQueue implements EvpnListenerQueue {
188
189 private final ExecutorService executorService;
190 private final BlockingQueue<EvpnRouteEvent> queue;
191 private final EvpnRouteListener listener;
192
193 /**
194 * Creates a new listener queue.
195 *
196 * @param listener route listener to queue updates for
197 */
198 public DefaultListenerQueue(EvpnRouteListener listener) {
199 this.listener = listener;
200 queue = new LinkedBlockingQueue<>();
201 executorService = newSingleThreadExecutor(threadFactory);
202 }
203
204 @Override
205 public void post(EvpnRouteEvent event) {
206 queue.add(event);
207 }
208
209 @Override
210 public void start() {
211 executorService.execute(this::poll);
212 }
213
214 @Override
215 public void stop() {
216 executorService.shutdown();
217 }
218
219 private void poll() {
220 while (true) {
221 try {
222 listener.event(queue.take());
223 } catch (InterruptedException e) {
224 log.info("Route listener event thread shutting down: {}", e.getMessage());
Ray Milkey5c7d4882018-02-05 14:50:39 -0800225 Thread.currentThread().interrupt();
Mohammad Shahidaa7c1232017-08-09 11:13:15 +0530226 break;
227 } catch (Exception e) {
228 log.warn("Exception during route event handler", e);
229 }
230 }
231 }
232 }
233
234 /**
235 * Delegate to receive events from the route store.
236 */
237 private class InternalEvpnRouteStoreDelegate implements
238 EvpnRouteStoreDelegate {
239 EvpnRouteSet routes;
240
241 @Override
242 public void notify(EvpnInternalRouteEvent event) {
243 switch (event.type()) {
244 case ROUTE_ADDED:
245 routes = event.subject();
246 if (routes != null) {
247 Collection<EvpnRoute> evpnRoutes = routes.routes();
248 for (EvpnRoute evpnRoute : evpnRoutes) {
249 post(new EvpnRouteEvent(
250 EvpnRouteEvent.Type.ROUTE_ADDED,
251 evpnRoute,
252 routes.routes()));
253 }
254 }
255 break;
256 case ROUTE_REMOVED:
257 routes = event.subject();
258 if (routes != null) {
259 Collection<EvpnRoute> evpnRoutes = routes.routes();
260 for (EvpnRoute evpnRoute : evpnRoutes) {
261 post(new EvpnRouteEvent(
262 EvpnRouteEvent.Type.ROUTE_REMOVED,
263 evpnRoute,
264 routes.routes()));
265 }
266 }
267 break;
268 default:
269 break;
270 }
271 }
272 }
273
274}