blob: 5169310f801315513097fea31942421356607add [file] [log] [blame]
Yi Tsengf4e13e32017-03-30 15:38:39 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Yi Tsengf4e13e32017-03-30 15:38:39 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.vpls;
18
19import com.google.common.collect.Maps;
20import com.google.common.collect.Queues;
21import com.google.common.collect.Sets;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.Tools;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipEvent;
31import org.onosproject.cluster.LeadershipEventListener;
32import org.onosproject.cluster.LeadershipService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
36import org.onosproject.incubator.net.intf.Interface;
37import org.onosproject.net.Host;
38import org.onosproject.net.host.HostService;
39import org.onosproject.net.intent.Intent;
40import org.onosproject.net.intent.IntentEvent;
41import org.onosproject.net.intent.IntentException;
42import org.onosproject.net.intent.IntentListener;
43import org.onosproject.net.intent.IntentService;
44import org.onosproject.net.intent.IntentUtils;
45import org.onosproject.net.intent.Key;
46import org.onosproject.net.intent.MultiPointToSinglePointIntent;
47import org.onosproject.net.intent.SinglePointToMultiPointIntent;
48import org.onosproject.vpls.api.VplsData;
49import org.onosproject.vpls.api.VplsOperationException;
50import org.onosproject.vpls.api.VplsOperationService;
51import org.onosproject.vpls.api.VplsOperation;
52import org.onosproject.vpls.api.VplsStore;
53import org.onosproject.vpls.intent.VplsIntentUtility;
54import org.slf4j.Logger;
55
56import java.util.Collection;
57import java.util.Deque;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.CompletableFuture;
61import java.util.concurrent.ExecutionException;
62import java.util.concurrent.ExecutorService;
63import java.util.concurrent.Executors;
64import java.util.concurrent.ScheduledExecutorService;
65import java.util.concurrent.TimeUnit;
66import java.util.concurrent.TimeoutException;
67import java.util.function.Consumer;
68import java.util.stream.Collectors;
69import java.util.stream.Stream;
70
71import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
72import static org.onlab.util.Tools.groupedThreads;
73import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * An implementation of VplsOperationService.
77 * Handles the execution order of the VPLS operations generated by the
78 * application.
79 */
80@Component(immediate = true)
81@Service
82public class VplsOperationManager implements VplsOperationService {
83 private static final int NUM_THREADS = 4;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected CoreService coreService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 protected IntentService intentService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected LeadershipService leadershipService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95 protected ClusterService clusterService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98 protected HostService hostService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected VplsStore vplsStore;
102
103 private final Logger log = getLogger(getClass());
104 protected Map<String, Deque<VplsOperation>> pendingVplsOperations;
105 protected final Map<String, VplsOperation> runningOperations = Maps.newHashMap();
106 protected ScheduledExecutorService schedulerExecutor;
107 protected ExecutorService workerExecutor;
108 protected ApplicationId appId;
109 protected boolean isLeader;
110 protected NodeId localNodeId;
111 protected LeadershipEventListener leadershipEventListener;
112
113 @Activate
114 public void activate() {
115 appId = coreService.registerApplication(VplsManager.VPLS_APP);
116 localNodeId = clusterService.getLocalNode().id();
117
118 leadershipEventListener = new InternalLeadershipListener();
119 leadershipService.addListener(leadershipEventListener);
120 leadershipService.runForLeadership(appId.name());
121 pendingVplsOperations = Maps.newConcurrentMap();
122
123 // Thread pool for VplsOperationExecutor
124 workerExecutor = newFixedThreadPool(NUM_THREADS,
125 groupedThreads("onos/apps/vpls",
126 "worker-%d",
127 log));
128 // A single thread pool for VplsOperationScheduler
129 schedulerExecutor = Executors.newScheduledThreadPool(1,
130 groupedThreads("onos/apps/vpls",
131 "scheduler-%d",
132 log));
133 // Start the scheduler
134 schedulerExecutor.scheduleAtFixedRate(new VplsOperationScheduler(),
135 0,
136 500,
137 TimeUnit.MILLISECONDS);
138
139 }
140
141 @Deactivate
142 public void deactivate() {
143 pendingVplsOperations.clear();
144 runningOperations.clear();
145 leadershipService.removeListener(leadershipEventListener);
146 schedulerExecutor.shutdown();
147 workerExecutor.shutdown();
148
149 // remove all intents from VPLS application when deactivated
150 Tools.stream(intentService.getIntents())
151 .filter(intent -> intent.appId().equals(appId))
152 .forEach(intentService::withdraw);
153 }
154
155 @Override
156 public void submit(VplsOperation vplsOperation) {
157 if (isLeader) {
158 // Only leader can execute operation
159 addVplsOperation(vplsOperation);
160 }
161 }
162
163 /**
164 * Adds a VPLS operation to the queue of pending operations.
165 *
166 * @param vplsOperation the VPLS operation to add
167 */
168 private void addVplsOperation(VplsOperation vplsOperation) {
169 VplsData vplsData = vplsOperation.vpls();
170 pendingVplsOperations.compute(vplsData.name(), (name, opQueue) -> {
171 opQueue = opQueue == null ? Queues.newArrayDeque() : opQueue;
172
173 // If the operation already exist in queue, ignore it.
174 if (opQueue.contains(vplsOperation)) {
175 return opQueue;
176 }
177 opQueue.add(vplsOperation);
178 return opQueue;
179 });
180 }
181
182 /**
183 * Optimizes the VPLS operation queue and return a single VPLS operation to
184 * execute.
185 *
186 * @param operations the queue to be optimized
187 * @return optimized VPLS operation from the queue
188 */
189 protected static VplsOperation getOptimizedVplsOperation(Deque<VplsOperation> operations) {
190 if (operations.isEmpty()) {
191 return null;
192 }
193 // no need to optimize if the queue contains only one operation
194 if (operations.size() == 1) {
195 return operations.getFirst();
196 }
197
198 final VplsOperation firstOperation = operations.peekFirst();
199 final VplsOperation lastOperation = operations.peekLast();
200 final VplsOperation.Operation firstOp = firstOperation.op();
201 final VplsOperation.Operation lastOp = lastOperation.op();
202
203 if (firstOp.equals(VplsOperation.Operation.REMOVE)) {
204 if (lastOp.equals(VplsOperation.Operation.REMOVE)) {
205 // case 1: both first and last operation are REMOVE; do remove
206 return firstOperation;
207 } else if (lastOp.equals(VplsOperation.Operation.ADD)) {
208 // case 2: if first is REMOVE, and last is ADD; do update
209 return VplsOperation.of(lastOperation.vpls(),
210 VplsOperation.Operation.UPDATE);
211 } else {
212 // case 3: first is REMOVE, last is UPDATE; do update
213 return lastOperation;
214 }
215 } else if (firstOp.equals(VplsOperation.Operation.ADD)) {
216 if (lastOp.equals(VplsOperation.Operation.REMOVE)) {
217 // case 4: first is ADD, last is REMOVE; nothing to do
218 return null;
219 } else if (lastOp.equals(VplsOperation.Operation.ADD)) {
220 // case 5: both first and last are ADD, do add
221 return VplsOperation.of(lastOperation.vpls(),
222 VplsOperation.Operation.ADD);
223 } else {
224 // case 6: first is ADD and last is update, do add
225 return VplsOperation.of(lastOperation.vpls(),
226 VplsOperation.Operation.ADD);
227 }
228 } else {
229 if (lastOp.equals(VplsOperation.Operation.REMOVE)) {
230 // case 7: last is remove, do remove
231 return lastOperation;
232 } else if (lastOp.equals(VplsOperation.Operation.ADD)) {
233 // case 8: do update only
234 return VplsOperation.of(lastOperation.vpls(),
235 VplsOperation.Operation.UPDATE);
236 } else {
237 // case 9: from UPDATE to UPDATE
238 // only need last UPDATE operation
239 return VplsOperation.of(lastOperation.vpls(),
240 VplsOperation.Operation.UPDATE);
241 }
242 }
243 }
244
245 /**
246 * Scheduler for VPLS operation.
247 * Processes a batch of VPLS operations in a period.
248 */
249 class VplsOperationScheduler implements Runnable {
250 private static final String UNKNOWN_STATE =
251 "Unknown state {} for success consumer";
252 private static final String OP_EXEC_ERR =
253 "Error when executing VPLS operation {}, error: {}";
254
255 /**
256 * Process a batch of VPLS operations.
257 */
258 @Override
259 public void run() {
260 Set<String> vplsNames = pendingVplsOperations.keySet();
261 vplsNames.forEach(vplsName -> {
262 VplsOperation operation;
263 synchronized (runningOperations) {
264 // Only one operation for a VPLS at the same time
265 if (runningOperations.containsKey(vplsName)) {
266 return;
267 }
268 Deque<VplsOperation> operations = pendingVplsOperations.remove(vplsName);
269 operation = getOptimizedVplsOperation(operations);
270 if (operation == null) {
271 // Nothing to do, this only happened when we add a VPLS
272 // and remove it before batch operations been processed.
273 return;
274 }
275 runningOperations.put(vplsName, operation);
276 }
277
278 VplsOperationExecutor operationExecutor =
279 new VplsOperationExecutor(operation);
280 operationExecutor.setConsumers(
281 (vplsOperation) -> {
282 // Success consumer
283 VplsData vplsData = vplsOperation.vpls();
284 log.debug("VPLS operation success: {}", vplsOperation);
285 switch (vplsData.state()) {
286 case ADDING:
287 case UPDATING:
288 vplsData.state(VplsData.VplsState.ADDED);
289 vplsStore.updateVpls(vplsData);
290 break;
291 case REMOVING:
292 // The VPLS information does not exists in
293 // store. No need to update the store.
294 break;
295 default:
296 log.warn(UNKNOWN_STATE, vplsData.state());
297 vplsData.state(VplsData.VplsState.FAILED);
298 vplsStore.updateVpls(vplsData);
299 break;
300 }
301 runningOperations.remove(vplsName);
302 },
303 (vplsOperationException) -> {
304 // Error consumer
305 VplsOperation vplsOperation =
306 vplsOperationException.vplsOperation();
Yi Tsengada511f2017-05-18 14:46:04 -0700307 log.warn(OP_EXEC_ERR,
308 vplsOperation.toString(),
309 vplsOperationException.getMessage());
Yi Tsengf4e13e32017-03-30 15:38:39 -0700310 VplsData vplsData = vplsOperation.vpls();
311 vplsData.state(VplsData.VplsState.FAILED);
312 vplsStore.updateVpls(vplsData);
Yi Tsengf4e13e32017-03-30 15:38:39 -0700313 runningOperations.remove(vplsName);
314 });
315 log.debug("Applying operation: {}", operation);
316 workerExecutor.execute(operationExecutor);
317 });
318 }
319 }
320
321 /**
322 * Direction for Intent installation.
323 */
324 private enum Direction {
325 ADD,
326 REMOVE
327 }
328
329 /**
330 * VPLS operation executor.
331 * Installs, updates or removes Intents according to the given VPLS operation.
332 */
333 class VplsOperationExecutor implements Runnable {
334 private static final String UNKNOWN_OP = "Unknown operation.";
335 private static final String UNKNOWN_INTENT_DIR = "Unknown Intent install direction.";
336 private static final int OPERATION_TIMEOUT = 10;
337 private VplsOperation vplsOperation;
338 private Consumer<VplsOperation> successConsumer;
339 private Consumer<VplsOperationException> errorConsumer;
340 private VplsOperationException error;
341
342 public VplsOperationExecutor(VplsOperation vplsOperation) {
343 this.vplsOperation = vplsOperation;
344 this.error = null;
345 }
346
347 /**
348 * Sets success consumer and error consumer for this executor.
349 *
350 * @param successConsumer the success consumer
351 * @param errorConsumer the error consumer
352 */
353 public void setConsumers(Consumer<VplsOperation> successConsumer,
354 Consumer<VplsOperationException> errorConsumer) {
355 this.successConsumer = successConsumer;
356 this.errorConsumer = errorConsumer;
357
358 }
359
360 @Override
361 public void run() {
362 switch (vplsOperation.op()) {
363 case ADD:
364 installVplsIntents();
365 break;
366 case REMOVE:
367 removeVplsIntents();
368 break;
369 case UPDATE:
370 updateVplsIntents();
371 break;
372 default:
373 this.error = new VplsOperationException(vplsOperation,
374 UNKNOWN_OP);
375 break;
376 }
377
378 if (this.error != null) {
379 errorConsumer.accept(this.error);
380 } else {
381 successConsumer.accept(vplsOperation);
382 }
383 }
384
385 /**
386 * Updates Intents of the VPLS.
387 */
388 private void updateVplsIntents() {
389 // check which part we need to update
390 // if we update host only, we don't need to reinstall
391 // every Intents
392 Set<Intent> intentsToInstall = Sets.newHashSet();
393 Set<Intent> intentsToUninstall = Sets.newHashSet();
394 VplsData vplsData = vplsOperation.vpls();
395 Set<Intent> currentIntents = getCurrentIntents();
396
397 // Compares broadcast Intents
398 Set<Intent> currentBrcIntents = currentIntents.stream()
399 .filter(intent -> intent instanceof SinglePointToMultiPointIntent)
400 .collect(Collectors.toSet());
401 Set<Intent> targetBrcIntents = VplsIntentUtility.buildBrcIntents(vplsData, appId);
402 if (!intentSetEquals(currentBrcIntents, targetBrcIntents)) {
403 // If broadcast Intents changes, it means some network
404 // interfaces or encapsulation constraint changed; Need to
405 // reinstall all intents
406 removeVplsIntents();
407 installVplsIntents();
408 return;
409 }
410
411 // Compares unicast Intents
412 Set<Intent> currentUniIntents = currentIntents.stream()
413 .filter(intent -> intent instanceof MultiPointToSinglePointIntent)
414 .collect(Collectors.toSet());
415 Set<Intent> targetUniIntents = VplsIntentUtility.buildUniIntents(vplsData,
416 hostsFromVpls(),
417 appId);
418
419 // New unicast Intents to install
420 targetUniIntents.forEach(intent -> {
421 if (!currentUniIntents.contains(intent)) {
422 intentsToInstall.add(intent);
423 }
424 });
425
426 // Old unicast Intents to remove
427 currentUniIntents.forEach(intent -> {
428 if (!targetUniIntents.contains(intent)) {
429 intentsToUninstall.add(intent);
430 }
431 });
432 applyIntentsSync(intentsToUninstall, Direction.REMOVE);
433 applyIntentsSync(intentsToInstall, Direction.ADD);
434 }
435
436 private Set<Host> hostsFromVpls() {
437 VplsData vplsData = vplsOperation.vpls();
438 Set<Interface> interfaces = vplsData.interfaces();
439 return interfaces.stream()
440 .map(this::hostsFromInterface)
441 .flatMap(Collection::stream)
442 .collect(Collectors.toSet());
443 }
444
445 private Set<Host> hostsFromInterface(Interface iface) {
446 return hostService.getConnectedHosts(iface.connectPoint())
447 .stream()
448 .filter(host -> host.vlan().equals(iface.vlan()))
449 .collect(Collectors.toSet());
450 }
451
452 /**
453 * Applies Intents synchronously with a specific direction.
454 *
455 * @param intents the Intents
456 * @param direction the direction
457 */
458 private void applyIntentsSync(Set<Intent> intents, Direction direction) {
459 Set<Key> pendingIntentKeys = intents.stream()
460 .map(Intent::key).collect(Collectors.toSet());
461 IntentCompleter completer;
462
463 switch (direction) {
464 case ADD:
465 completer = new IntentCompleter(pendingIntentKeys,
466 IntentEvent.Type.INSTALLED);
467 intentService.addListener(completer);
468 intents.forEach(intentService::submit);
469 break;
470 case REMOVE:
471 completer = new IntentCompleter(pendingIntentKeys,
472 IntentEvent.Type.WITHDRAWN);
473 intentService.addListener(completer);
474 intents.forEach(intentService::withdraw);
475 break;
476 default:
477 this.error = new VplsOperationException(this.vplsOperation,
478 UNKNOWN_INTENT_DIR);
479 return;
480 }
481
482 try {
483 // Wait until Intent operation completed
484 completer.complete();
485 } catch (VplsOperationException e) {
486 this.error = e;
487 } finally {
488 intentService.removeListener(completer);
489 }
490 }
491
492 /**
493 * Checks if two sets of Intents are equal.
494 *
495 * @param intentSet1 the first set of Intents
496 * @param intentSet2 the second set of Intents
497 * @return true if both set of Intents are equal; otherwise false
498 */
499 private boolean intentSetEquals(Set<Intent> intentSet1, Set<Intent> intentSet2) {
500 if (intentSet1.size() != intentSet2.size()) {
501 return false;
502 }
503 for (Intent intent1 : intentSet1) {
504 if (intentSet2.stream()
505 .noneMatch(intent2 -> IntentUtils.intentsAreEqual(intent1, intent2))) {
506 return false;
507 }
508 }
509 return true;
510 }
511
512 /**
513 * Retrieves installed Intents from IntentService which related to
514 * specific VPLS.
515 *
516 * @return the Intents which related to the VPLS
517 */
518 private Set<Intent> getCurrentIntents() {
519 VplsData vplsData = vplsOperation.vpls();
520 String vplsName = vplsData.name();
521 return Tools.stream(intentService.getIntents())
522 .filter(intent -> intent.key().toString().startsWith(vplsName))
523 .collect(Collectors.toSet());
524 }
525
526 /**
527 * Generates unicast Intents and broadcast Intents for the VPLS.
528 *
529 * @return Intents for the VPLS
530 */
531 private Set<Intent> generateVplsIntents() {
532 VplsData vplsData = vplsOperation.vpls();
533 Set<Intent> brcIntents = VplsIntentUtility.buildBrcIntents(vplsData, appId);
534 Set<Intent> uniIntent = VplsIntentUtility.buildUniIntents(vplsData, hostsFromVpls(), appId);
535
536 return Stream.concat(brcIntents.stream(), uniIntent.stream())
537 .collect(Collectors.toSet());
538 }
539
540 /**
541 * Removes all Intents from the VPLS.
542 */
543 private void removeVplsIntents() {
544 Set<Intent> intentsToWithdraw = getCurrentIntents();
545 applyIntentsSync(intentsToWithdraw, Direction.REMOVE);
Yi Tsengada511f2017-05-18 14:46:04 -0700546 intentsToWithdraw.forEach(intentService::purge);
Yi Tsengf4e13e32017-03-30 15:38:39 -0700547 }
548
549 /**
550 * Installs Intents of the VPLS.
551 */
552 private void installVplsIntents() {
553 Set<Intent> intentsToInstall = generateVplsIntents();
554 applyIntentsSync(intentsToInstall, Direction.ADD);
555 }
556
557 /**
558 * Helper class which monitors if all Intent operations are completed.
559 */
560 class IntentCompleter implements IntentListener {
561 private static final String INTENT_COMPILE_ERR = "Got {} from intent completer";
562 private CompletableFuture<Void> completableFuture;
563 private Set<Key> pendingIntentKeys;
564 private IntentEvent.Type expectedEventType;
565
566 /**
567 * Initialize completer with given Intent keys and expect Intent
568 * event type.
569 *
570 * @param pendingIntentKeys the Intent keys to wait
571 * @param expectedEventType expect Intent event type
572 */
Yi Tseng0ee6aa22017-05-19 14:43:27 -0700573 public IntentCompleter(Set<Key> pendingIntentKeys,
574 IntentEvent.Type expectedEventType) {
Yi Tsengf4e13e32017-03-30 15:38:39 -0700575 this.completableFuture = new CompletableFuture<>();
576 this.pendingIntentKeys = Sets.newConcurrentHashSet(pendingIntentKeys);
577 this.expectedEventType = expectedEventType;
578 }
579
580 @Override
581 public void event(IntentEvent event) {
582 Intent intent = event.subject();
Yi Tseng0ee6aa22017-05-19 14:43:27 -0700583 Key key = intent.key();
584 if (!pendingIntentKeys.contains(key)) {
585 // ignore Intent events from other VPLS
586 return;
587 }
Yi Tsengf4e13e32017-03-30 15:38:39 -0700588 // Intent failed, throw an exception to completable future
589 if (event.type() == IntentEvent.Type.CORRUPT ||
590 event.type() == IntentEvent.Type.FAILED) {
591 completableFuture.completeExceptionally(new IntentException(intent.toString()));
592 return;
593 }
594 // If event type matched to expected type, remove from pending
595 if (event.type() == expectedEventType) {
Yi Tsengf4e13e32017-03-30 15:38:39 -0700596 pendingIntentKeys.remove(key);
597 }
598 if (pendingIntentKeys.isEmpty()) {
599 completableFuture.complete(null);
600 }
601 }
602
603 /**
604 * Waits until all pending Intents completed ot timeout.
605 */
606 public void complete() {
607 // If no pending Intent keys, complete directly
608 if (pendingIntentKeys.isEmpty()) {
609 return;
610 }
611 try {
612 completableFuture.get(OPERATION_TIMEOUT, TimeUnit.SECONDS);
613 } catch (TimeoutException | InterruptedException |
614 ExecutionException | IntentException e) {
615 // TODO: handle errors more carefully
616 log.warn(INTENT_COMPILE_ERR, e.toString());
617 throw new VplsOperationException(vplsOperation, e.toString());
618 }
619 }
620 }
621 }
622
623 /**
624 * A listener for leadership events.
625 * Only the leader can process VPLS operation in the ONOS cluster.
626 */
627 private class InternalLeadershipListener implements LeadershipEventListener {
628 private static final String LEADER_CHANGE = "Change leader to {}";
629
630 @Override
631 public void event(LeadershipEvent event) {
632 switch (event.type()) {
633 case LEADER_CHANGED:
634 case LEADER_AND_CANDIDATES_CHANGED:
635 isLeader = localNodeId.equals(event.subject().leaderNodeId());
636 if (isLeader) {
637 log.debug(LEADER_CHANGE, localNodeId);
638 }
639 break;
640 default:
641 break;
642 }
643 }
644
645 @Override
646 public boolean isRelevant(LeadershipEvent event) {
647 return event.subject().topic().equals(appId.name());
648 }
649 }
650}