blob: 0adf7ebd4ac4d12aab15fcda38b5a9a9568a5f54 [file] [log] [blame]
Charles Chanfc5c7802016-05-17 13:13:55 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.util.KryoNamespace;
22import org.onosproject.net.ConnectPoint;
23import org.onosproject.net.DeviceId;
24import org.onosproject.net.PortNumber;
25import org.onosproject.net.config.NetworkConfigEvent;
26import org.onosproject.net.flow.DefaultTrafficSelector;
27import org.onosproject.net.flow.DefaultTrafficTreatment;
28import org.onosproject.net.flow.TrafficSelector;
29import org.onosproject.net.flow.TrafficTreatment;
30import org.onosproject.net.flow.criteria.Criteria;
31import org.onosproject.net.flowobjective.DefaultFilteringObjective;
32import org.onosproject.net.flowobjective.DefaultForwardingObjective;
33import org.onosproject.net.flowobjective.DefaultNextObjective;
34import org.onosproject.net.flowobjective.DefaultObjectiveContext;
35import org.onosproject.net.flowobjective.FilteringObjective;
36import org.onosproject.net.flowobjective.ForwardingObjective;
37import org.onosproject.net.flowobjective.NextObjective;
38import org.onosproject.net.flowobjective.Objective;
39import org.onosproject.net.flowobjective.ObjectiveContext;
40import org.onosproject.net.flowobjective.ObjectiveError;
41import org.onosproject.segmentrouting.config.XConnectConfig;
42import org.onosproject.segmentrouting.storekey.XConnectStoreKey;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.ConsistentMap;
45import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
47import org.onosproject.store.service.Versioned;
48import org.slf4j.Logger;
49import org.slf4j.LoggerFactory;
50
51import java.util.Iterator;
52import java.util.Map;
53import java.util.Set;
54import java.util.concurrent.CompletableFuture;
55import java.util.stream.Collectors;
56
57/**
58 * Handles cross connect related events.
59 */
60public class XConnectHandler {
61 private static final Logger log = LoggerFactory.getLogger(XConnectHandler.class);
62 private static final String CONFIG_NOT_FOUND = "XConnect config missing";
63 private static final String NOT_MASTER = "Not master controller";
64 private final SegmentRoutingManager srManager;
65 private final StorageService storageService;
66 private final ConsistentMap<XConnectStoreKey, NextObjective> xConnectNextObjStore;
67 private final KryoNamespace.Builder xConnectKryo;
68
69 protected XConnectHandler(SegmentRoutingManager srManager) {
70 this.srManager = srManager;
71 this.storageService = srManager.storageService;
72 xConnectKryo = new KryoNamespace.Builder()
73 .register(KryoNamespaces.API)
74 .register(XConnectStoreKey.class)
75 .register(NextObjContext.class);
76 xConnectNextObjStore = storageService
77 .<XConnectStoreKey, NextObjective>consistentMapBuilder()
78 .withName("onos-xconnect-nextobj-store")
79 .withSerializer(Serializer.using(xConnectKryo.build()))
80 .build();
81 }
82
83 /**
84 * Read initial XConnect for given device.
85 *
86 * @param deviceId ID of the device to be initialized
87 */
88 public void init(DeviceId deviceId) {
89 // Try to read XConnect config
90 XConnectConfig config =
91 srManager.cfgService.getConfig(srManager.appId, XConnectConfig.class);
92 if (config == null) {
93 log.warn("Failed to read XConnect config: {}", CONFIG_NOT_FOUND);
94 return;
95 }
96
97 config.getXconnects(deviceId).forEach(key -> {
98 populateXConnect(key, config.getPorts(key));
99 });
100 }
101
102 /**
103 * Processes Segment Routing App Config added event.
104 *
105 * @param event network config added event
106 */
107 protected void processXConnectConfigAdded(NetworkConfigEvent event) {
108 log.info("Processing XConnect CONFIG_ADDED");
109 XConnectConfig config = (XConnectConfig) event.config().get();
110 config.getXconnects().forEach(key -> {
111 populateXConnect(key, config.getPorts(key));
112 });
113 }
114
115 /**
116 * Processes Segment Routing App Config updated event.
117 *
118 * @param event network config updated event
119 */
120 protected void processXConnectConfigUpdated(NetworkConfigEvent event) {
121 log.info("Processing XConnect CONFIG_UPDATED");
122 XConnectConfig prevConfig = (XConnectConfig) event.prevConfig().get();
123 XConnectConfig config = (XConnectConfig) event.config().get();
124 Set<XConnectStoreKey> prevKeys = prevConfig.getXconnects();
125 Set<XConnectStoreKey> keys = config.getXconnects();
126
127 Set<XConnectStoreKey> pendingRemove = prevKeys.stream()
128 .filter(key -> !keys.contains(key)).collect(Collectors.toSet());
129 Set<XConnectStoreKey> pendingAdd = keys.stream()
130 .filter(key -> !prevKeys.contains(key)).collect(Collectors.toSet());
131 Set<XConnectStoreKey> pendingUpdate = keys.stream()
132 .filter(key -> prevKeys.contains(key) &&
133 !config.getPorts(key).equals(prevConfig.getPorts(key)))
134 .collect(Collectors.toSet());
135
136 pendingRemove.forEach(key -> {
137 revokeXConnect(key, prevConfig.getPorts(key));
138 });
139 pendingAdd.forEach(key -> {
140 populateXConnect(key, config.getPorts(key));
141 });
142 pendingUpdate.forEach(key -> {
143 updateXConnect(key, prevConfig.getPorts(key), config.getPorts(key));
144 });
145 }
146
147 /**
148 * Processes Segment Routing App Config removed event.
149 *
150 * @param event network config removed event
151 */
152 protected void processXConnectConfigRemoved(NetworkConfigEvent event) {
153 log.info("Processing XConnect CONFIG_REMOVED");
154 XConnectConfig prevConfig = (XConnectConfig) event.prevConfig().get();
155 prevConfig.getXconnects().forEach(key -> {
156 revokeXConnect(key, prevConfig.getPorts(key));
157 });
158 }
159
160 /**
161 * Checks if there is any XConnect configured on given connect point.
162 *
163 * @param cp connect point
164 * @return true if there is XConnect configured on given connect point.
165 */
166 public boolean hasXConnect(ConnectPoint cp) {
167 // Try to read XConnect config
168 XConnectConfig config =
169 srManager.cfgService.getConfig(srManager.appId, XConnectConfig.class);
170 if (config == null) {
171 log.warn("Failed to read XConnect config: {}", CONFIG_NOT_FOUND);
172 return false;
173 }
174 return config.getXconnects(cp.deviceId()).stream()
175 .anyMatch(key -> config.getPorts(key).contains(cp.port()));
176 }
177
178 /**
179 * Populates XConnect groups and flows for given key.
180 *
181 * @param key XConnect key
182 * @param ports a set of ports to be cross-connected
183 */
184 private void populateXConnect(XConnectStoreKey key, Set<PortNumber> ports) {
185 if (!srManager.mastershipService.isLocalMaster(key.deviceId())) {
186 log.info("Abort populating XConnect {}: {}", key, NOT_MASTER);
187 return;
188 }
189 populateFilter(key, ports);
190 populateFwd(key, populateNext(key, ports));
191 }
192
193 /**
194 * Populates filtering objectives for given XConnect.
195 *
196 * @param key XConnect store key
197 * @param ports XConnect ports
198 */
199 private void populateFilter(XConnectStoreKey key, Set<PortNumber> ports) {
200 ports.forEach(port -> {
201 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
202 ObjectiveContext context = new DefaultObjectiveContext(
203 (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
204 key, port),
205 (objective, error) ->
206 log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
207 key, port, error));
208 srManager.flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
209 });
210 }
211
212 /**
213 * Populates next objectives for given XConnect.
214 *
215 * @param key XConnect store key
216 * @param ports XConnect ports
217 */
218 private NextObjective populateNext(XConnectStoreKey key, Set<PortNumber> ports) {
219 NextObjective nextObj = null;
220 if (xConnectNextObjStore.containsKey(key)) {
221 nextObj = xConnectNextObjStore.get(key).value();
222 log.debug("NextObj for {} found, id={}", key, nextObj.id());
223 } else {
224 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
225 ObjectiveContext nextContext = new NextObjContext(Objective.Operation.ADD, key);
226 nextObj = nextObjBuilder.add(nextContext);
227 srManager.flowObjectiveService.next(key.deviceId(), nextObj);
228 xConnectNextObjStore.put(key, nextObj);
229 log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
230 }
231 return nextObj;
232 }
233
234 /**
235 * Populates forwarding objectives for given XConnect.
236 *
237 * @param key XConnect store key
238 * @param nextObj next objective
239 */
240 private void populateFwd(XConnectStoreKey key, NextObjective nextObj) {
241 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
242 ObjectiveContext fwdContext = new DefaultObjectiveContext(
243 (objective) -> log.debug("XConnect FwdObj for {} populated", key),
244 (objective, error) ->
245 log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
246 srManager.flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
247 }
248
249 /**
250 * Revokes XConnect groups and flows for given key.
251 *
252 * @param key XConnect key
253 * @param ports XConnect ports
254 */
255 private void revokeXConnect(XConnectStoreKey key, Set<PortNumber> ports) {
256 if (!srManager.mastershipService.isLocalMaster(key.deviceId())) {
257 log.info("Abort populating XConnect {}: {}", key, NOT_MASTER);
258 return;
259 }
260
261 revokeFilter(key, ports);
262 if (xConnectNextObjStore.containsKey(key)) {
263 NextObjective nextObj = xConnectNextObjStore.get(key).value();
264 revokeFwd(key, nextObj, null);
265 revokeNext(key, nextObj, null);
266 } else {
267 log.warn("NextObj for {} does not exist in the store.", key);
268 }
269 }
270
271 /**
272 * Revokes filtering objectives for given XConnect.
273 *
274 * @param key XConnect store key
275 * @param ports XConnect ports
276 */
277 private void revokeFilter(XConnectStoreKey key, Set<PortNumber> ports) {
278 ports.forEach(port -> {
279 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
280 ObjectiveContext context = new DefaultObjectiveContext(
281 (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
282 key, port),
283 (objective, error) ->
284 log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
285 key, port, error));
286 srManager.flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
287 });
288 }
289
290 /**
291 * Revokes next objectives for given XConnect.
292 *
293 * @param key XConnect store key
294 * @param nextObj next objective
295 * @param nextFuture completable future for this next objective operation
296 */
297 private void revokeNext(XConnectStoreKey key, NextObjective nextObj,
298 CompletableFuture<ObjectiveError> nextFuture) {
299 ObjectiveContext context = new ObjectiveContext() {
300 @Override
301 public void onSuccess(Objective objective) {
302 log.debug("Previous NextObj for {} removed", key);
303 if (nextFuture != null) {
304 nextFuture.complete(null);
305 }
306 }
307
308 @Override
309 public void onError(Objective objective, ObjectiveError error) {
310 log.warn("Failed to remove previous NextObj for {}: {}", key, error);
311 if (nextFuture != null) {
312 nextFuture.complete(error);
313 }
314 }
315 };
316 srManager.flowObjectiveService.next(key.deviceId(),
317 (NextObjective) nextObj.copy().remove(context));
318 xConnectNextObjStore.remove(key);
319 }
320
321 /**
322 * Revokes forwarding objectives for given XConnect.
323 *
324 * @param key XConnect store key
325 * @param nextObj next objective
326 * @param fwdFuture completable future for this forwarding objective operation
327 */
328 private void revokeFwd(XConnectStoreKey key, NextObjective nextObj,
329 CompletableFuture<ObjectiveError> fwdFuture) {
330 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
331 ObjectiveContext context = new ObjectiveContext() {
332 @Override
333 public void onSuccess(Objective objective) {
334 log.debug("Previous FwdObj for {} removed", key);
335 if (fwdFuture != null) {
336 fwdFuture.complete(null);
337 }
338 }
339
340 @Override
341 public void onError(Objective objective, ObjectiveError error) {
342 log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
343 if (fwdFuture != null) {
344 fwdFuture.complete(error);
345 }
346 }
347 };
348 srManager.flowObjectiveService
349 .forward(key.deviceId(), fwdObjBuilder.remove(context));
350 }
351
352 /**
353 * Updates XConnect groups and flows for given key.
354 *
355 * @param key XConnect key
356 * @param prevPorts previous XConnect ports
357 * @param ports new XConnect ports
358 */
359 private void updateXConnect(XConnectStoreKey key, Set<PortNumber> prevPorts,
360 Set<PortNumber> ports) {
361 // remove old filter
362 prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port -> {
363 revokeFilter(key, ImmutableSet.of(port));
364 });
365 // install new filter
366 ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port -> {
367 populateFilter(key, ImmutableSet.of(port));
368 });
369
370 CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
371 CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
372
373 if (xConnectNextObjStore.containsKey(key)) {
374 NextObjective nextObj = xConnectNextObjStore.get(key).value();
375 revokeFwd(key, nextObj, fwdFuture);
376
377 fwdFuture.thenAcceptAsync(fwdStatus -> {
378 if (fwdStatus == null) {
379 log.debug("Fwd removed. Now remove group {}", key);
380 revokeNext(key, nextObj, nextFuture);
381 }
382 });
383
384 nextFuture.thenAcceptAsync(nextStatus -> {
385 if (nextStatus == null) {
386 log.debug("Installing new group and flow for {}", key);
387 populateFwd(key, populateNext(key, ports));
388 }
389 });
390 } else {
391 log.warn("NextObj for {} does not exist in the store.", key);
392 }
393 }
394
395 /**
396 * Remove all groups on given device.
397 *
398 * @param deviceId device ID
399 */
400 protected void removeDevice(DeviceId deviceId) {
401 Iterator<Map.Entry<XConnectStoreKey, Versioned<NextObjective>>> itNextObj =
402 xConnectNextObjStore.entrySet().iterator();
403 while (itNextObj.hasNext()) {
404 Map.Entry<XConnectStoreKey, Versioned<NextObjective>> entry = itNextObj.next();
405 if (entry.getKey().deviceId().equals(deviceId)) {
406 itNextObj.remove();
407 }
408 }
409 }
410
411 /**
412 * Creates a next objective builder for XConnect.
413 *
414 * @param key XConnect key
415 * @param ports set of XConnect ports
416 * @return next objective builder
417 */
418 private NextObjective.Builder nextObjBuilder(XConnectStoreKey key, Set<PortNumber> ports) {
419 int nextId = srManager.flowObjectiveService.allocateNextId();
420 TrafficSelector metadata =
421 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
422 NextObjective.Builder nextObjBuilder = DefaultNextObjective
423 .builder().withId(nextId)
424 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
425 .withMeta(metadata);
426 ports.forEach(port -> {
427 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
428 tBuilder.setOutput(port);
429 nextObjBuilder.addTreatment(tBuilder.build());
430 });
431 return nextObjBuilder;
432 }
433
434 /**
435 * Creates a forwarding objective builder for XConnect.
436 *
437 * @param key XConnect key
438 * @param nextId next ID of the broadcast group for this XConnect key
439 * @return next objective builder
440 */
441 private ForwardingObjective.Builder fwdObjBuilder(XConnectStoreKey key, int nextId) {
442 /*
443 * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
444 * as the VLAN cross-connect broadcast rules
445 */
446 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
447 sbuilder.matchVlanId(key.vlanId());
448 sbuilder.matchEthDst(MacAddress.NONE);
449
450 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
451 fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
452 .withSelector(sbuilder.build())
453 .nextStep(nextId)
454 .withPriority(SegmentRoutingService.XCONNECT_PRIORITY)
455 .fromApp(srManager.appId)
456 .makePermanent();
457 return fob;
458 }
459
460 /**
461 * Creates a filtering objective builder for XConnect.
462 *
463 * @param key XConnect key
464 * @param port XConnect ports
465 * @return next objective builder
466 */
467 private FilteringObjective.Builder filterObjBuilder(XConnectStoreKey key, PortNumber port) {
468 FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
469 fob.withKey(Criteria.matchInPort(port))
470 .addCondition(Criteria.matchVlanId(key.vlanId()))
471 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
472 .withPriority(SegmentRoutingService.XCONNECT_PRIORITY);
473 return fob.permit().fromApp(srManager.appId);
474 }
475
476 // TODO: Lambda closure in DefaultObjectiveContext cannot be serialized properly
477 // with Kryo 3.0.3. It will be fixed in 3.0.4. By then we can use
478 // DefaultObjectiveContext again.
479 private final class NextObjContext implements ObjectiveContext {
480 Objective.Operation op;
481 XConnectStoreKey key;
482
483 private NextObjContext(Objective.Operation op, XConnectStoreKey key) {
484 this.op = op;
485 this.key = key;
486 }
487
488 @Override
489 public void onSuccess(Objective objective) {
490 log.debug("XConnect NextObj for {} {}ED", key, op);
491 }
492
493 @Override
494 public void onError(Objective objective, ObjectiveError error) {
495 log.warn("Failed to {} XConnect NextObj for {}: {}", op, key, error);
496 }
497 }
498}