blob: c3f737fdfa36470b7b80e26644b0588b2dacbca3 [file] [log] [blame]
Charles Chan82f19972016-05-17 13:13:55 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting;
18
19import com.google.common.collect.ImmutableSet;
20import org.onlab.packet.MacAddress;
21import org.onlab.util.KryoNamespace;
22import org.onosproject.net.ConnectPoint;
23import org.onosproject.net.DeviceId;
24import org.onosproject.net.PortNumber;
25import org.onosproject.net.config.NetworkConfigEvent;
26import org.onosproject.net.flow.DefaultTrafficSelector;
27import org.onosproject.net.flow.DefaultTrafficTreatment;
28import org.onosproject.net.flow.TrafficSelector;
29import org.onosproject.net.flow.TrafficTreatment;
30import org.onosproject.net.flow.criteria.Criteria;
31import org.onosproject.net.flowobjective.DefaultFilteringObjective;
32import org.onosproject.net.flowobjective.DefaultForwardingObjective;
33import org.onosproject.net.flowobjective.DefaultNextObjective;
34import org.onosproject.net.flowobjective.DefaultObjectiveContext;
35import org.onosproject.net.flowobjective.FilteringObjective;
36import org.onosproject.net.flowobjective.ForwardingObjective;
37import org.onosproject.net.flowobjective.NextObjective;
38import org.onosproject.net.flowobjective.Objective;
39import org.onosproject.net.flowobjective.ObjectiveContext;
40import org.onosproject.net.flowobjective.ObjectiveError;
41import org.onosproject.segmentrouting.config.XConnectConfig;
42import org.onosproject.segmentrouting.storekey.XConnectStoreKey;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.ConsistentMap;
45import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
Charles Chan82f19972016-05-17 13:13:55 -070047import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
49
Charles Chan82f19972016-05-17 13:13:55 -070050import java.util.Set;
51import java.util.concurrent.CompletableFuture;
52import java.util.stream.Collectors;
53
54/**
55 * Handles cross connect related events.
56 */
57public class XConnectHandler {
58 private static final Logger log = LoggerFactory.getLogger(XConnectHandler.class);
59 private static final String CONFIG_NOT_FOUND = "XConnect config missing";
60 private static final String NOT_MASTER = "Not master controller";
61 private final SegmentRoutingManager srManager;
62 private final StorageService storageService;
63 private final ConsistentMap<XConnectStoreKey, NextObjective> xConnectNextObjStore;
64 private final KryoNamespace.Builder xConnectKryo;
65
66 protected XConnectHandler(SegmentRoutingManager srManager) {
67 this.srManager = srManager;
68 this.storageService = srManager.storageService;
69 xConnectKryo = new KryoNamespace.Builder()
70 .register(KryoNamespaces.API)
71 .register(XConnectStoreKey.class)
72 .register(NextObjContext.class);
73 xConnectNextObjStore = storageService
74 .<XConnectStoreKey, NextObjective>consistentMapBuilder()
75 .withName("onos-xconnect-nextobj-store")
76 .withSerializer(Serializer.using(xConnectKryo.build()))
77 .build();
78 }
79
80 /**
81 * Read initial XConnect for given device.
82 *
83 * @param deviceId ID of the device to be initialized
84 */
85 public void init(DeviceId deviceId) {
86 // Try to read XConnect config
87 XConnectConfig config =
88 srManager.cfgService.getConfig(srManager.appId, XConnectConfig.class);
89 if (config == null) {
90 log.warn("Failed to read XConnect config: {}", CONFIG_NOT_FOUND);
91 return;
92 }
93
94 config.getXconnects(deviceId).forEach(key -> {
95 populateXConnect(key, config.getPorts(key));
96 });
97 }
98
99 /**
100 * Processes Segment Routing App Config added event.
101 *
102 * @param event network config added event
103 */
104 protected void processXConnectConfigAdded(NetworkConfigEvent event) {
105 log.info("Processing XConnect CONFIG_ADDED");
106 XConnectConfig config = (XConnectConfig) event.config().get();
107 config.getXconnects().forEach(key -> {
108 populateXConnect(key, config.getPorts(key));
109 });
110 }
111
112 /**
113 * Processes Segment Routing App Config updated event.
114 *
115 * @param event network config updated event
116 */
117 protected void processXConnectConfigUpdated(NetworkConfigEvent event) {
118 log.info("Processing XConnect CONFIG_UPDATED");
119 XConnectConfig prevConfig = (XConnectConfig) event.prevConfig().get();
120 XConnectConfig config = (XConnectConfig) event.config().get();
121 Set<XConnectStoreKey> prevKeys = prevConfig.getXconnects();
122 Set<XConnectStoreKey> keys = config.getXconnects();
123
124 Set<XConnectStoreKey> pendingRemove = prevKeys.stream()
125 .filter(key -> !keys.contains(key)).collect(Collectors.toSet());
126 Set<XConnectStoreKey> pendingAdd = keys.stream()
127 .filter(key -> !prevKeys.contains(key)).collect(Collectors.toSet());
128 Set<XConnectStoreKey> pendingUpdate = keys.stream()
129 .filter(key -> prevKeys.contains(key) &&
130 !config.getPorts(key).equals(prevConfig.getPorts(key)))
131 .collect(Collectors.toSet());
132
133 pendingRemove.forEach(key -> {
134 revokeXConnect(key, prevConfig.getPorts(key));
135 });
136 pendingAdd.forEach(key -> {
137 populateXConnect(key, config.getPorts(key));
138 });
139 pendingUpdate.forEach(key -> {
140 updateXConnect(key, prevConfig.getPorts(key), config.getPorts(key));
141 });
142 }
143
144 /**
145 * Processes Segment Routing App Config removed event.
146 *
147 * @param event network config removed event
148 */
149 protected void processXConnectConfigRemoved(NetworkConfigEvent event) {
150 log.info("Processing XConnect CONFIG_REMOVED");
151 XConnectConfig prevConfig = (XConnectConfig) event.prevConfig().get();
152 prevConfig.getXconnects().forEach(key -> {
153 revokeXConnect(key, prevConfig.getPorts(key));
154 });
155 }
156
157 /**
158 * Checks if there is any XConnect configured on given connect point.
159 *
160 * @param cp connect point
161 * @return true if there is XConnect configured on given connect point.
162 */
163 public boolean hasXConnect(ConnectPoint cp) {
164 // Try to read XConnect config
165 XConnectConfig config =
166 srManager.cfgService.getConfig(srManager.appId, XConnectConfig.class);
167 if (config == null) {
168 log.warn("Failed to read XConnect config: {}", CONFIG_NOT_FOUND);
169 return false;
170 }
171 return config.getXconnects(cp.deviceId()).stream()
172 .anyMatch(key -> config.getPorts(key).contains(cp.port()));
173 }
174
175 /**
176 * Populates XConnect groups and flows for given key.
177 *
178 * @param key XConnect key
179 * @param ports a set of ports to be cross-connected
180 */
181 private void populateXConnect(XConnectStoreKey key, Set<PortNumber> ports) {
182 if (!srManager.mastershipService.isLocalMaster(key.deviceId())) {
183 log.info("Abort populating XConnect {}: {}", key, NOT_MASTER);
184 return;
185 }
186 populateFilter(key, ports);
187 populateFwd(key, populateNext(key, ports));
188 }
189
190 /**
191 * Populates filtering objectives for given XConnect.
192 *
193 * @param key XConnect store key
194 * @param ports XConnect ports
195 */
196 private void populateFilter(XConnectStoreKey key, Set<PortNumber> ports) {
197 ports.forEach(port -> {
198 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
199 ObjectiveContext context = new DefaultObjectiveContext(
200 (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
201 key, port),
202 (objective, error) ->
203 log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
204 key, port, error));
205 srManager.flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
206 });
207 }
208
209 /**
210 * Populates next objectives for given XConnect.
211 *
212 * @param key XConnect store key
213 * @param ports XConnect ports
214 */
215 private NextObjective populateNext(XConnectStoreKey key, Set<PortNumber> ports) {
216 NextObjective nextObj = null;
217 if (xConnectNextObjStore.containsKey(key)) {
218 nextObj = xConnectNextObjStore.get(key).value();
219 log.debug("NextObj for {} found, id={}", key, nextObj.id());
220 } else {
221 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
222 ObjectiveContext nextContext = new NextObjContext(Objective.Operation.ADD, key);
223 nextObj = nextObjBuilder.add(nextContext);
224 srManager.flowObjectiveService.next(key.deviceId(), nextObj);
225 xConnectNextObjStore.put(key, nextObj);
226 log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
227 }
228 return nextObj;
229 }
230
231 /**
232 * Populates forwarding objectives for given XConnect.
233 *
234 * @param key XConnect store key
235 * @param nextObj next objective
236 */
237 private void populateFwd(XConnectStoreKey key, NextObjective nextObj) {
238 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
239 ObjectiveContext fwdContext = new DefaultObjectiveContext(
240 (objective) -> log.debug("XConnect FwdObj for {} populated", key),
241 (objective, error) ->
242 log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
243 srManager.flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
244 }
245
246 /**
247 * Revokes XConnect groups and flows for given key.
248 *
249 * @param key XConnect key
250 * @param ports XConnect ports
251 */
252 private void revokeXConnect(XConnectStoreKey key, Set<PortNumber> ports) {
253 if (!srManager.mastershipService.isLocalMaster(key.deviceId())) {
254 log.info("Abort populating XConnect {}: {}", key, NOT_MASTER);
255 return;
256 }
257
258 revokeFilter(key, ports);
259 if (xConnectNextObjStore.containsKey(key)) {
260 NextObjective nextObj = xConnectNextObjStore.get(key).value();
261 revokeFwd(key, nextObj, null);
262 revokeNext(key, nextObj, null);
263 } else {
264 log.warn("NextObj for {} does not exist in the store.", key);
265 }
266 }
267
268 /**
269 * Revokes filtering objectives for given XConnect.
270 *
271 * @param key XConnect store key
272 * @param ports XConnect ports
273 */
274 private void revokeFilter(XConnectStoreKey key, Set<PortNumber> ports) {
275 ports.forEach(port -> {
276 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
277 ObjectiveContext context = new DefaultObjectiveContext(
278 (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
279 key, port),
280 (objective, error) ->
281 log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
282 key, port, error));
283 srManager.flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
284 });
285 }
286
287 /**
288 * Revokes next objectives for given XConnect.
289 *
290 * @param key XConnect store key
291 * @param nextObj next objective
292 * @param nextFuture completable future for this next objective operation
293 */
294 private void revokeNext(XConnectStoreKey key, NextObjective nextObj,
295 CompletableFuture<ObjectiveError> nextFuture) {
296 ObjectiveContext context = new ObjectiveContext() {
297 @Override
298 public void onSuccess(Objective objective) {
299 log.debug("Previous NextObj for {} removed", key);
300 if (nextFuture != null) {
301 nextFuture.complete(null);
302 }
303 }
304
305 @Override
306 public void onError(Objective objective, ObjectiveError error) {
307 log.warn("Failed to remove previous NextObj for {}: {}", key, error);
308 if (nextFuture != null) {
309 nextFuture.complete(error);
310 }
311 }
312 };
313 srManager.flowObjectiveService.next(key.deviceId(),
314 (NextObjective) nextObj.copy().remove(context));
315 xConnectNextObjStore.remove(key);
316 }
317
318 /**
319 * Revokes forwarding objectives for given XConnect.
320 *
321 * @param key XConnect store key
322 * @param nextObj next objective
323 * @param fwdFuture completable future for this forwarding objective operation
324 */
325 private void revokeFwd(XConnectStoreKey key, NextObjective nextObj,
326 CompletableFuture<ObjectiveError> fwdFuture) {
327 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
328 ObjectiveContext context = new ObjectiveContext() {
329 @Override
330 public void onSuccess(Objective objective) {
331 log.debug("Previous FwdObj for {} removed", key);
332 if (fwdFuture != null) {
333 fwdFuture.complete(null);
334 }
335 }
336
337 @Override
338 public void onError(Objective objective, ObjectiveError error) {
339 log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
340 if (fwdFuture != null) {
341 fwdFuture.complete(error);
342 }
343 }
344 };
345 srManager.flowObjectiveService
346 .forward(key.deviceId(), fwdObjBuilder.remove(context));
347 }
348
349 /**
350 * Updates XConnect groups and flows for given key.
351 *
352 * @param key XConnect key
353 * @param prevPorts previous XConnect ports
354 * @param ports new XConnect ports
355 */
356 private void updateXConnect(XConnectStoreKey key, Set<PortNumber> prevPorts,
357 Set<PortNumber> ports) {
358 // remove old filter
359 prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port -> {
360 revokeFilter(key, ImmutableSet.of(port));
361 });
362 // install new filter
363 ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port -> {
364 populateFilter(key, ImmutableSet.of(port));
365 });
366
367 CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
368 CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
369
370 if (xConnectNextObjStore.containsKey(key)) {
371 NextObjective nextObj = xConnectNextObjStore.get(key).value();
372 revokeFwd(key, nextObj, fwdFuture);
373
374 fwdFuture.thenAcceptAsync(fwdStatus -> {
375 if (fwdStatus == null) {
376 log.debug("Fwd removed. Now remove group {}", key);
377 revokeNext(key, nextObj, nextFuture);
378 }
379 });
380
381 nextFuture.thenAcceptAsync(nextStatus -> {
382 if (nextStatus == null) {
383 log.debug("Installing new group and flow for {}", key);
384 populateFwd(key, populateNext(key, ports));
385 }
386 });
387 } else {
388 log.warn("NextObj for {} does not exist in the store.", key);
389 }
390 }
391
392 /**
393 * Remove all groups on given device.
394 *
395 * @param deviceId device ID
396 */
397 protected void removeDevice(DeviceId deviceId) {
Charles Chanfc115892016-06-17 14:28:07 -0700398 xConnectNextObjStore.entrySet().stream()
399 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
400 .forEach(entry -> {
401 xConnectNextObjStore.remove(entry.getKey());
402 });
403 log.debug("{} is removed from xConnectNextObjStore", deviceId);
Charles Chan82f19972016-05-17 13:13:55 -0700404 }
405
406 /**
407 * Creates a next objective builder for XConnect.
408 *
409 * @param key XConnect key
410 * @param ports set of XConnect ports
411 * @return next objective builder
412 */
413 private NextObjective.Builder nextObjBuilder(XConnectStoreKey key, Set<PortNumber> ports) {
414 int nextId = srManager.flowObjectiveService.allocateNextId();
415 TrafficSelector metadata =
416 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
417 NextObjective.Builder nextObjBuilder = DefaultNextObjective
418 .builder().withId(nextId)
419 .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId)
420 .withMeta(metadata);
421 ports.forEach(port -> {
422 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
423 tBuilder.setOutput(port);
424 nextObjBuilder.addTreatment(tBuilder.build());
425 });
426 return nextObjBuilder;
427 }
428
429 /**
430 * Creates a forwarding objective builder for XConnect.
431 *
432 * @param key XConnect key
433 * @param nextId next ID of the broadcast group for this XConnect key
434 * @return next objective builder
435 */
436 private ForwardingObjective.Builder fwdObjBuilder(XConnectStoreKey key, int nextId) {
437 /*
438 * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
439 * as the VLAN cross-connect broadcast rules
440 */
441 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
442 sbuilder.matchVlanId(key.vlanId());
443 sbuilder.matchEthDst(MacAddress.NONE);
444
445 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
446 fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
447 .withSelector(sbuilder.build())
448 .nextStep(nextId)
449 .withPriority(SegmentRoutingService.XCONNECT_PRIORITY)
450 .fromApp(srManager.appId)
451 .makePermanent();
452 return fob;
453 }
454
455 /**
456 * Creates a filtering objective builder for XConnect.
457 *
458 * @param key XConnect key
459 * @param port XConnect ports
460 * @return next objective builder
461 */
462 private FilteringObjective.Builder filterObjBuilder(XConnectStoreKey key, PortNumber port) {
463 FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
464 fob.withKey(Criteria.matchInPort(port))
465 .addCondition(Criteria.matchVlanId(key.vlanId()))
466 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
467 .withPriority(SegmentRoutingService.XCONNECT_PRIORITY);
468 return fob.permit().fromApp(srManager.appId);
469 }
470
471 // TODO: Lambda closure in DefaultObjectiveContext cannot be serialized properly
472 // with Kryo 3.0.3. It will be fixed in 3.0.4. By then we can use
473 // DefaultObjectiveContext again.
474 private final class NextObjContext implements ObjectiveContext {
475 Objective.Operation op;
476 XConnectStoreKey key;
477
478 private NextObjContext(Objective.Operation op, XConnectStoreKey key) {
479 this.op = op;
480 this.key = key;
481 }
482
483 @Override
484 public void onSuccess(Objective objective) {
485 log.debug("XConnect NextObj for {} {}ED", key, op);
486 }
487
488 @Override
489 public void onError(Objective objective, ObjectiveError error) {
490 log.warn("Failed to {} XConnect NextObj for {}: {}", op, key, error);
491 }
492 }
493}