| /* |
| * Copyright 2017-present Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.onosproject.vpls; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Queues; |
| import com.google.common.collect.Sets; |
| import org.apache.felix.scr.annotations.Activate; |
| import org.apache.felix.scr.annotations.Component; |
| import org.apache.felix.scr.annotations.Deactivate; |
| import org.apache.felix.scr.annotations.Reference; |
| import org.apache.felix.scr.annotations.ReferenceCardinality; |
| import org.apache.felix.scr.annotations.Service; |
| import org.onlab.util.Tools; |
| import org.onosproject.cluster.ClusterService; |
| import org.onosproject.cluster.LeadershipEvent; |
| import org.onosproject.cluster.LeadershipEventListener; |
| import org.onosproject.cluster.LeadershipService; |
| import org.onosproject.cluster.NodeId; |
| import org.onosproject.core.ApplicationId; |
| import org.onosproject.core.CoreService; |
| import org.onosproject.incubator.net.intf.Interface; |
| import org.onosproject.net.Host; |
| import org.onosproject.net.host.HostService; |
| import org.onosproject.net.intent.Intent; |
| import org.onosproject.net.intent.IntentEvent; |
| import org.onosproject.net.intent.IntentException; |
| import org.onosproject.net.intent.IntentListener; |
| import org.onosproject.net.intent.IntentService; |
| import org.onosproject.net.intent.IntentUtils; |
| import org.onosproject.net.intent.Key; |
| import org.onosproject.net.intent.MultiPointToSinglePointIntent; |
| import org.onosproject.net.intent.SinglePointToMultiPointIntent; |
| import org.onosproject.vpls.api.VplsData; |
| import org.onosproject.vpls.api.VplsOperationException; |
| import org.onosproject.vpls.api.VplsOperationService; |
| import org.onosproject.vpls.api.VplsOperation; |
| import org.onosproject.vpls.api.VplsStore; |
| import org.onosproject.vpls.intent.VplsIntentUtility; |
| import org.slf4j.Logger; |
| |
| import java.util.Collection; |
| import java.util.Deque; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; |
| import static org.onlab.util.Tools.groupedThreads; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| /** |
| * An implementation of VplsOperationService. |
| * Handles the execution order of the VPLS operations generated by the |
| * application. |
| */ |
| @Component(immediate = true) |
| @Service |
| public class VplsOperationManager implements VplsOperationService { |
| private static final int NUM_THREADS = 4; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected CoreService coreService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected IntentService intentService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected LeadershipService leadershipService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected ClusterService clusterService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected HostService hostService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected VplsStore vplsStore; |
| |
| private final Logger log = getLogger(getClass()); |
| protected Map<String, Deque<VplsOperation>> pendingVplsOperations; |
| protected final Map<String, VplsOperation> runningOperations = Maps.newHashMap(); |
| protected ScheduledExecutorService schedulerExecutor; |
| protected ExecutorService workerExecutor; |
| protected ApplicationId appId; |
| protected boolean isLeader; |
| protected NodeId localNodeId; |
| protected LeadershipEventListener leadershipEventListener; |
| |
| @Activate |
| public void activate() { |
| appId = coreService.registerApplication(VplsManager.VPLS_APP); |
| localNodeId = clusterService.getLocalNode().id(); |
| |
| leadershipEventListener = new InternalLeadershipListener(); |
| leadershipService.addListener(leadershipEventListener); |
| leadershipService.runForLeadership(appId.name()); |
| pendingVplsOperations = Maps.newConcurrentMap(); |
| |
| // Thread pool for VplsOperationExecutor |
| workerExecutor = newFixedThreadPool(NUM_THREADS, |
| groupedThreads("onos/apps/vpls", |
| "worker-%d", |
| log)); |
| // A single thread pool for VplsOperationScheduler |
| schedulerExecutor = Executors.newScheduledThreadPool(1, |
| groupedThreads("onos/apps/vpls", |
| "scheduler-%d", |
| log)); |
| // Start the scheduler |
| schedulerExecutor.scheduleAtFixedRate(new VplsOperationScheduler(), |
| 0, |
| 500, |
| TimeUnit.MILLISECONDS); |
| |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| pendingVplsOperations.clear(); |
| runningOperations.clear(); |
| leadershipService.removeListener(leadershipEventListener); |
| schedulerExecutor.shutdown(); |
| workerExecutor.shutdown(); |
| |
| // remove all intents from VPLS application when deactivated |
| Tools.stream(intentService.getIntents()) |
| .filter(intent -> intent.appId().equals(appId)) |
| .forEach(intentService::withdraw); |
| } |
| |
| @Override |
| public void submit(VplsOperation vplsOperation) { |
| if (isLeader) { |
| // Only leader can execute operation |
| addVplsOperation(vplsOperation); |
| } |
| } |
| |
| /** |
| * Adds a VPLS operation to the queue of pending operations. |
| * |
| * @param vplsOperation the VPLS operation to add |
| */ |
| private void addVplsOperation(VplsOperation vplsOperation) { |
| VplsData vplsData = vplsOperation.vpls(); |
| pendingVplsOperations.compute(vplsData.name(), (name, opQueue) -> { |
| opQueue = opQueue == null ? Queues.newArrayDeque() : opQueue; |
| |
| // If the operation already exist in queue, ignore it. |
| if (opQueue.contains(vplsOperation)) { |
| return opQueue; |
| } |
| opQueue.add(vplsOperation); |
| return opQueue; |
| }); |
| } |
| |
| /** |
| * Optimizes the VPLS operation queue and return a single VPLS operation to |
| * execute. |
| * |
| * @param operations the queue to be optimized |
| * @return optimized VPLS operation from the queue |
| */ |
| protected static VplsOperation getOptimizedVplsOperation(Deque<VplsOperation> operations) { |
| if (operations.isEmpty()) { |
| return null; |
| } |
| // no need to optimize if the queue contains only one operation |
| if (operations.size() == 1) { |
| return operations.getFirst(); |
| } |
| |
| final VplsOperation firstOperation = operations.peekFirst(); |
| final VplsOperation lastOperation = operations.peekLast(); |
| final VplsOperation.Operation firstOp = firstOperation.op(); |
| final VplsOperation.Operation lastOp = lastOperation.op(); |
| |
| if (firstOp.equals(VplsOperation.Operation.REMOVE)) { |
| if (lastOp.equals(VplsOperation.Operation.REMOVE)) { |
| // case 1: both first and last operation are REMOVE; do remove |
| return firstOperation; |
| } else if (lastOp.equals(VplsOperation.Operation.ADD)) { |
| // case 2: if first is REMOVE, and last is ADD; do update |
| return VplsOperation.of(lastOperation.vpls(), |
| VplsOperation.Operation.UPDATE); |
| } else { |
| // case 3: first is REMOVE, last is UPDATE; do update |
| return lastOperation; |
| } |
| } else if (firstOp.equals(VplsOperation.Operation.ADD)) { |
| if (lastOp.equals(VplsOperation.Operation.REMOVE)) { |
| // case 4: first is ADD, last is REMOVE; nothing to do |
| return null; |
| } else if (lastOp.equals(VplsOperation.Operation.ADD)) { |
| // case 5: both first and last are ADD, do add |
| return VplsOperation.of(lastOperation.vpls(), |
| VplsOperation.Operation.ADD); |
| } else { |
| // case 6: first is ADD and last is update, do add |
| return VplsOperation.of(lastOperation.vpls(), |
| VplsOperation.Operation.ADD); |
| } |
| } else { |
| if (lastOp.equals(VplsOperation.Operation.REMOVE)) { |
| // case 7: last is remove, do remove |
| return lastOperation; |
| } else if (lastOp.equals(VplsOperation.Operation.ADD)) { |
| // case 8: do update only |
| return VplsOperation.of(lastOperation.vpls(), |
| VplsOperation.Operation.UPDATE); |
| } else { |
| // case 9: from UPDATE to UPDATE |
| // only need last UPDATE operation |
| return VplsOperation.of(lastOperation.vpls(), |
| VplsOperation.Operation.UPDATE); |
| } |
| } |
| } |
| |
| /** |
| * Scheduler for VPLS operation. |
| * Processes a batch of VPLS operations in a period. |
| */ |
| class VplsOperationScheduler implements Runnable { |
| private static final String UNKNOWN_STATE = |
| "Unknown state {} for success consumer"; |
| private static final String OP_EXEC_ERR = |
| "Error when executing VPLS operation {}, error: {}"; |
| |
| /** |
| * Process a batch of VPLS operations. |
| */ |
| @Override |
| public void run() { |
| Set<String> vplsNames = pendingVplsOperations.keySet(); |
| vplsNames.forEach(vplsName -> { |
| VplsOperation operation; |
| synchronized (runningOperations) { |
| // Only one operation for a VPLS at the same time |
| if (runningOperations.containsKey(vplsName)) { |
| return; |
| } |
| Deque<VplsOperation> operations = pendingVplsOperations.remove(vplsName); |
| operation = getOptimizedVplsOperation(operations); |
| if (operation == null) { |
| // Nothing to do, this only happened when we add a VPLS |
| // and remove it before batch operations been processed. |
| return; |
| } |
| runningOperations.put(vplsName, operation); |
| } |
| |
| VplsOperationExecutor operationExecutor = |
| new VplsOperationExecutor(operation); |
| operationExecutor.setConsumers( |
| (vplsOperation) -> { |
| // Success consumer |
| VplsData vplsData = vplsOperation.vpls(); |
| log.debug("VPLS operation success: {}", vplsOperation); |
| switch (vplsData.state()) { |
| case ADDING: |
| case UPDATING: |
| vplsData.state(VplsData.VplsState.ADDED); |
| vplsStore.updateVpls(vplsData); |
| break; |
| case REMOVING: |
| // The VPLS information does not exists in |
| // store. No need to update the store. |
| break; |
| default: |
| log.warn(UNKNOWN_STATE, vplsData.state()); |
| vplsData.state(VplsData.VplsState.FAILED); |
| vplsStore.updateVpls(vplsData); |
| break; |
| } |
| runningOperations.remove(vplsName); |
| }, |
| (vplsOperationException) -> { |
| // Error consumer |
| VplsOperation vplsOperation = |
| vplsOperationException.vplsOperation(); |
| log.debug("VPLS operation failed: {}", vplsOperation); |
| VplsData vplsData = vplsOperation.vpls(); |
| vplsData.state(VplsData.VplsState.FAILED); |
| vplsStore.updateVpls(vplsData); |
| log.error(OP_EXEC_ERR, |
| vplsOperation.toString(), |
| vplsOperationException.getMessage()); |
| runningOperations.remove(vplsName); |
| }); |
| log.debug("Applying operation: {}", operation); |
| workerExecutor.execute(operationExecutor); |
| }); |
| } |
| } |
| |
| /** |
| * Direction for Intent installation. |
| */ |
| private enum Direction { |
| ADD, |
| REMOVE |
| } |
| |
| /** |
| * VPLS operation executor. |
| * Installs, updates or removes Intents according to the given VPLS operation. |
| */ |
| class VplsOperationExecutor implements Runnable { |
| private static final String UNKNOWN_OP = "Unknown operation."; |
| private static final String UNKNOWN_INTENT_DIR = "Unknown Intent install direction."; |
| private static final int OPERATION_TIMEOUT = 10; |
| private VplsOperation vplsOperation; |
| private Consumer<VplsOperation> successConsumer; |
| private Consumer<VplsOperationException> errorConsumer; |
| private VplsOperationException error; |
| |
| public VplsOperationExecutor(VplsOperation vplsOperation) { |
| this.vplsOperation = vplsOperation; |
| this.error = null; |
| } |
| |
| /** |
| * Sets success consumer and error consumer for this executor. |
| * |
| * @param successConsumer the success consumer |
| * @param errorConsumer the error consumer |
| */ |
| public void setConsumers(Consumer<VplsOperation> successConsumer, |
| Consumer<VplsOperationException> errorConsumer) { |
| this.successConsumer = successConsumer; |
| this.errorConsumer = errorConsumer; |
| |
| } |
| |
| @Override |
| public void run() { |
| switch (vplsOperation.op()) { |
| case ADD: |
| installVplsIntents(); |
| break; |
| case REMOVE: |
| removeVplsIntents(); |
| break; |
| case UPDATE: |
| updateVplsIntents(); |
| break; |
| default: |
| this.error = new VplsOperationException(vplsOperation, |
| UNKNOWN_OP); |
| break; |
| } |
| |
| if (this.error != null) { |
| errorConsumer.accept(this.error); |
| } else { |
| successConsumer.accept(vplsOperation); |
| } |
| } |
| |
| /** |
| * Updates Intents of the VPLS. |
| */ |
| private void updateVplsIntents() { |
| // check which part we need to update |
| // if we update host only, we don't need to reinstall |
| // every Intents |
| Set<Intent> intentsToInstall = Sets.newHashSet(); |
| Set<Intent> intentsToUninstall = Sets.newHashSet(); |
| VplsData vplsData = vplsOperation.vpls(); |
| Set<Intent> currentIntents = getCurrentIntents(); |
| |
| // Compares broadcast Intents |
| Set<Intent> currentBrcIntents = currentIntents.stream() |
| .filter(intent -> intent instanceof SinglePointToMultiPointIntent) |
| .collect(Collectors.toSet()); |
| Set<Intent> targetBrcIntents = VplsIntentUtility.buildBrcIntents(vplsData, appId); |
| if (!intentSetEquals(currentBrcIntents, targetBrcIntents)) { |
| // If broadcast Intents changes, it means some network |
| // interfaces or encapsulation constraint changed; Need to |
| // reinstall all intents |
| removeVplsIntents(); |
| installVplsIntents(); |
| return; |
| } |
| |
| // Compares unicast Intents |
| Set<Intent> currentUniIntents = currentIntents.stream() |
| .filter(intent -> intent instanceof MultiPointToSinglePointIntent) |
| .collect(Collectors.toSet()); |
| Set<Intent> targetUniIntents = VplsIntentUtility.buildUniIntents(vplsData, |
| hostsFromVpls(), |
| appId); |
| |
| // New unicast Intents to install |
| targetUniIntents.forEach(intent -> { |
| if (!currentUniIntents.contains(intent)) { |
| intentsToInstall.add(intent); |
| } |
| }); |
| |
| // Old unicast Intents to remove |
| currentUniIntents.forEach(intent -> { |
| if (!targetUniIntents.contains(intent)) { |
| intentsToUninstall.add(intent); |
| } |
| }); |
| applyIntentsSync(intentsToUninstall, Direction.REMOVE); |
| applyIntentsSync(intentsToInstall, Direction.ADD); |
| } |
| |
| private Set<Host> hostsFromVpls() { |
| VplsData vplsData = vplsOperation.vpls(); |
| Set<Interface> interfaces = vplsData.interfaces(); |
| return interfaces.stream() |
| .map(this::hostsFromInterface) |
| .flatMap(Collection::stream) |
| .collect(Collectors.toSet()); |
| } |
| |
| private Set<Host> hostsFromInterface(Interface iface) { |
| return hostService.getConnectedHosts(iface.connectPoint()) |
| .stream() |
| .filter(host -> host.vlan().equals(iface.vlan())) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Applies Intents synchronously with a specific direction. |
| * |
| * @param intents the Intents |
| * @param direction the direction |
| */ |
| private void applyIntentsSync(Set<Intent> intents, Direction direction) { |
| Set<Key> pendingIntentKeys = intents.stream() |
| .map(Intent::key).collect(Collectors.toSet()); |
| IntentCompleter completer; |
| |
| switch (direction) { |
| case ADD: |
| completer = new IntentCompleter(pendingIntentKeys, |
| IntentEvent.Type.INSTALLED); |
| intentService.addListener(completer); |
| intents.forEach(intentService::submit); |
| break; |
| case REMOVE: |
| completer = new IntentCompleter(pendingIntentKeys, |
| IntentEvent.Type.WITHDRAWN); |
| intentService.addListener(completer); |
| intents.forEach(intentService::withdraw); |
| break; |
| default: |
| this.error = new VplsOperationException(this.vplsOperation, |
| UNKNOWN_INTENT_DIR); |
| return; |
| } |
| |
| try { |
| // Wait until Intent operation completed |
| completer.complete(); |
| } catch (VplsOperationException e) { |
| this.error = e; |
| } finally { |
| intentService.removeListener(completer); |
| } |
| } |
| |
| /** |
| * Checks if two sets of Intents are equal. |
| * |
| * @param intentSet1 the first set of Intents |
| * @param intentSet2 the second set of Intents |
| * @return true if both set of Intents are equal; otherwise false |
| */ |
| private boolean intentSetEquals(Set<Intent> intentSet1, Set<Intent> intentSet2) { |
| if (intentSet1.size() != intentSet2.size()) { |
| return false; |
| } |
| for (Intent intent1 : intentSet1) { |
| if (intentSet2.stream() |
| .noneMatch(intent2 -> IntentUtils.intentsAreEqual(intent1, intent2))) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Retrieves installed Intents from IntentService which related to |
| * specific VPLS. |
| * |
| * @return the Intents which related to the VPLS |
| */ |
| private Set<Intent> getCurrentIntents() { |
| VplsData vplsData = vplsOperation.vpls(); |
| String vplsName = vplsData.name(); |
| return Tools.stream(intentService.getIntents()) |
| .filter(intent -> intent.key().toString().startsWith(vplsName)) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Generates unicast Intents and broadcast Intents for the VPLS. |
| * |
| * @return Intents for the VPLS |
| */ |
| private Set<Intent> generateVplsIntents() { |
| VplsData vplsData = vplsOperation.vpls(); |
| Set<Intent> brcIntents = VplsIntentUtility.buildBrcIntents(vplsData, appId); |
| Set<Intent> uniIntent = VplsIntentUtility.buildUniIntents(vplsData, hostsFromVpls(), appId); |
| |
| return Stream.concat(brcIntents.stream(), uniIntent.stream()) |
| .collect(Collectors.toSet()); |
| } |
| |
| /** |
| * Removes all Intents from the VPLS. |
| */ |
| private void removeVplsIntents() { |
| Set<Intent> intentsToWithdraw = getCurrentIntents(); |
| applyIntentsSync(intentsToWithdraw, Direction.REMOVE); |
| } |
| |
| /** |
| * Installs Intents of the VPLS. |
| */ |
| private void installVplsIntents() { |
| Set<Intent> intentsToInstall = generateVplsIntents(); |
| applyIntentsSync(intentsToInstall, Direction.ADD); |
| } |
| |
| /** |
| * Helper class which monitors if all Intent operations are completed. |
| */ |
| class IntentCompleter implements IntentListener { |
| private static final String INTENT_COMPILE_ERR = "Got {} from intent completer"; |
| private CompletableFuture<Void> completableFuture; |
| private Set<Key> pendingIntentKeys; |
| private IntentEvent.Type expectedEventType; |
| |
| /** |
| * Initialize completer with given Intent keys and expect Intent |
| * event type. |
| * |
| * @param pendingIntentKeys the Intent keys to wait |
| * @param expectedEventType expect Intent event type |
| */ |
| public IntentCompleter(Set<Key> pendingIntentKeys, IntentEvent.Type expectedEventType) { |
| this.completableFuture = new CompletableFuture<>(); |
| this.pendingIntentKeys = Sets.newConcurrentHashSet(pendingIntentKeys); |
| this.expectedEventType = expectedEventType; |
| } |
| |
| @Override |
| public void event(IntentEvent event) { |
| Intent intent = event.subject(); |
| // Intent failed, throw an exception to completable future |
| if (event.type() == IntentEvent.Type.CORRUPT || |
| event.type() == IntentEvent.Type.FAILED) { |
| completableFuture.completeExceptionally(new IntentException(intent.toString())); |
| return; |
| } |
| // If event type matched to expected type, remove from pending |
| if (event.type() == expectedEventType) { |
| Key key = intent.key(); |
| pendingIntentKeys.remove(key); |
| } |
| if (pendingIntentKeys.isEmpty()) { |
| completableFuture.complete(null); |
| } |
| } |
| |
| /** |
| * Waits until all pending Intents completed ot timeout. |
| */ |
| public void complete() { |
| // If no pending Intent keys, complete directly |
| if (pendingIntentKeys.isEmpty()) { |
| return; |
| } |
| try { |
| completableFuture.get(OPERATION_TIMEOUT, TimeUnit.SECONDS); |
| } catch (TimeoutException | InterruptedException | |
| ExecutionException | IntentException e) { |
| // TODO: handle errors more carefully |
| log.warn(INTENT_COMPILE_ERR, e.toString()); |
| throw new VplsOperationException(vplsOperation, e.toString()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * A listener for leadership events. |
| * Only the leader can process VPLS operation in the ONOS cluster. |
| */ |
| private class InternalLeadershipListener implements LeadershipEventListener { |
| private static final String LEADER_CHANGE = "Change leader to {}"; |
| |
| @Override |
| public void event(LeadershipEvent event) { |
| switch (event.type()) { |
| case LEADER_CHANGED: |
| case LEADER_AND_CANDIDATES_CHANGED: |
| isLeader = localNodeId.equals(event.subject().leaderNodeId()); |
| if (isLeader) { |
| log.debug(LEADER_CHANGE, localNodeId); |
| } |
| break; |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| public boolean isRelevant(LeadershipEvent event) { |
| return event.subject().topic().equals(appId.name()); |
| } |
| } |
| } |