blob: 19e63118d8e7a292d2caecd409462664bd100761 [file] [log] [blame]
Mohammad Shahidaa7c1232017-08-09 11:13:15 +05301/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Milkeya95193c2017-08-10 15:35:36 -070017package org.onosproject.evpnrouteservice.impl;
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053018
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053019import java.util.Collection;
20import java.util.HashMap;
21import java.util.Map;
22import java.util.concurrent.BlockingQueue;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.LinkedBlockingQueue;
25import java.util.concurrent.ThreadFactory;
26
Ray Milkeya95193c2017-08-10 15:35:36 -070027import javax.annotation.concurrent.GuardedBy;
28
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Deactivate;
32import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
35
36import org.onosproject.evpnrouteservice.EvpnInternalRouteEvent;
37import org.onosproject.evpnrouteservice.EvpnRoute;
38import org.onosproject.evpnrouteservice.EvpnRouteAdminService;
39import org.onosproject.evpnrouteservice.EvpnRouteEvent;
40import org.onosproject.evpnrouteservice.EvpnRouteListener;
41import org.onosproject.evpnrouteservice.EvpnRouteService;
42import org.onosproject.evpnrouteservice.EvpnRouteSet;
43import org.onosproject.evpnrouteservice.EvpnRouteStore;
44import org.onosproject.evpnrouteservice.EvpnRouteStoreDelegate;
45import org.onosproject.evpnrouteservice.EvpnRouteTableId;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
Mohammad Shahidaa7c1232017-08-09 11:13:15 +053049import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51
52/**
53 * Implementation of the EVPN route service.
54 */
55@Service
56@Component
57public class EvpnRouteManager implements EvpnRouteService,
58 EvpnRouteAdminService {
59
60 private final Logger log = LoggerFactory.getLogger(getClass());
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected EvpnRouteStore evpnRouteStore;
64
65 @GuardedBy(value = "this")
66 private Map<EvpnRouteListener, EvpnListenerQueue> listeners = new
67 HashMap<>();
68
69 private ThreadFactory threadFactory;
70
71 private EvpnRouteStoreDelegate evpnRouteStoreDelegate = new
72 InternalEvpnRouteStoreDelegate();
73
74 @Activate
75 protected void activate() {
76 threadFactory = groupedThreads("onos/route", "listener-%d", log);
77 evpnRouteStore.setDelegate(evpnRouteStoreDelegate);
78
79 }
80
81 @Deactivate
82 protected void deactivate() {
83 evpnRouteStore.unsetDelegate(evpnRouteStoreDelegate);
84 listeners.values().forEach(EvpnListenerQueue::stop);
85 }
86
87 /**
88 * {@inheritDoc}
89 * <p>
90 * In a departure from other services in ONOS, calling addListener will
91 * cause all current routes to be pushed to the listener before any new
92 * events are sent. This allows a listener to easily get the exact set of
93 * routes without worrying about missing any.
94 *
95 * @param listener listener to be added
96 */
97 @Override
98 public void addListener(EvpnRouteListener listener) {
99 synchronized (this) {
100 EvpnListenerQueue l = createListenerQueue(listener);
101
102 evpnRouteStore.getRouteTables().forEach(routeTableId
103 -> {
104 Collection<EvpnRouteSet> routes
105 = evpnRouteStore.getRoutes(routeTableId);
106 if (routes != null) {
107 routes.forEach(route -> {
108 Collection<EvpnRoute> evpnRoutes = route.routes();
109 for (EvpnRoute evpnRoute : evpnRoutes) {
110 l.post(new EvpnRouteEvent(
111 EvpnRouteEvent.Type.ROUTE_ADDED,
112 evpnRoute,
113 route.routes()));
114 }
115 });
116 }
117 });
118 listeners.put(listener, l);
119
120 l.start();
121 log.debug("Route synchronization complete");
122 }
123 }
124
125 @Override
126 public void removeListener(EvpnRouteListener listener) {
127 synchronized (this) {
128 EvpnListenerQueue l = listeners.remove(listener);
129 if (l != null) {
130 l.stop();
131 }
132 }
133 }
134
135 /**
136 * Posts an event to all listeners.
137 *
138 * @param event event
139 */
140
141 private void post(EvpnRouteEvent event) {
142 if (event != null) {
143 log.debug("Sending event {}", event);
144 synchronized (this) {
145 listeners.values().forEach(l -> l.post(event));
146 }
147 }
148 }
149
150
Jonathan Harte9c0c6e2017-08-09 16:44:13 -0700151 public Collection<EvpnRouteTableId> getRouteTables() {
Mohammad Shahidaa7c1232017-08-09 11:13:15 +0530152 return evpnRouteStore.getRouteTables();
153 }
154
155 @Override
156 public void update(Collection<EvpnRoute> routes) {
157 synchronized (this) {
158 routes.forEach(route -> {
159 log.debug("Received update {}", route);
160 evpnRouteStore.updateRoute(route);
161 });
162 }
163 }
164
165 @Override
166 public void withdraw(Collection<EvpnRoute> routes) {
167 synchronized (this) {
168 routes.forEach(route -> {
169 log.debug("Received withdraw {}", route);
170 evpnRouteStore.removeRoute(route);
171 });
172 }
173 }
174
175 /**
176 * Creates a new listener queue.
177 *
178 * @param listener route listener
179 * @return listener queue
180 */
181 DefaultListenerQueue createListenerQueue(EvpnRouteListener listener) {
182 return new DefaultListenerQueue(listener);
183 }
184
185 /**
186 * Default route listener queue.
187 */
188 private class DefaultListenerQueue implements EvpnListenerQueue {
189
190 private final ExecutorService executorService;
191 private final BlockingQueue<EvpnRouteEvent> queue;
192 private final EvpnRouteListener listener;
193
194 /**
195 * Creates a new listener queue.
196 *
197 * @param listener route listener to queue updates for
198 */
199 public DefaultListenerQueue(EvpnRouteListener listener) {
200 this.listener = listener;
201 queue = new LinkedBlockingQueue<>();
202 executorService = newSingleThreadExecutor(threadFactory);
203 }
204
205 @Override
206 public void post(EvpnRouteEvent event) {
207 queue.add(event);
208 }
209
210 @Override
211 public void start() {
212 executorService.execute(this::poll);
213 }
214
215 @Override
216 public void stop() {
217 executorService.shutdown();
218 }
219
220 private void poll() {
221 while (true) {
222 try {
223 listener.event(queue.take());
224 } catch (InterruptedException e) {
225 log.info("Route listener event thread shutting down: {}", e.getMessage());
226 break;
227 } catch (Exception e) {
228 log.warn("Exception during route event handler", e);
229 }
230 }
231 }
232 }
233
234 /**
235 * Delegate to receive events from the route store.
236 */
237 private class InternalEvpnRouteStoreDelegate implements
238 EvpnRouteStoreDelegate {
239 EvpnRouteSet routes;
240
241 @Override
242 public void notify(EvpnInternalRouteEvent event) {
243 switch (event.type()) {
244 case ROUTE_ADDED:
245 routes = event.subject();
246 if (routes != null) {
247 Collection<EvpnRoute> evpnRoutes = routes.routes();
248 for (EvpnRoute evpnRoute : evpnRoutes) {
249 post(new EvpnRouteEvent(
250 EvpnRouteEvent.Type.ROUTE_ADDED,
251 evpnRoute,
252 routes.routes()));
253 }
254 }
255 break;
256 case ROUTE_REMOVED:
257 routes = event.subject();
258 if (routes != null) {
259 Collection<EvpnRoute> evpnRoutes = routes.routes();
260 for (EvpnRoute evpnRoute : evpnRoutes) {
261 post(new EvpnRouteEvent(
262 EvpnRouteEvent.Type.ROUTE_REMOVED,
263 evpnRoute,
264 routes.routes()));
265 }
266 }
267 break;
268 default:
269 break;
270 }
271 }
272 }
273
274}