blob: 4ffa4af01201de20bb7d2e7fcccedb05c99a1f07 [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.stc;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.stc.Coordinator.Directive.*;
import static org.onlab.stc.Coordinator.Status.*;
/**
* Coordinates execution of a scenario process flow.
*/
public class Coordinator {
private static final int MAX_THREADS = 16;
private final ExecutorService executor = newFixedThreadPool(MAX_THREADS);
private final Scenario scenario;
private final ProcessFlow processFlow;
private final StepProcessListener delegate;
private final CountDownLatch latch;
private final ScenarioStore store;
private final Set<StepProcessListener> listeners = Sets.newConcurrentHashSet();
private File logDir;
/**
* Represents action to be taken on a test step.
*/
public enum Directive {
NOOP, RUN, SKIP
}
/**
* Represents processor state.
*/
public enum Status {
WAITING, IN_PROGRESS, SUCCEEDED, FAILED
}
/**
* Creates a process flow coordinator.
*
* @param scenario test scenario to coordinate
* @param processFlow process flow to coordinate
* @param logDir scenario log directory
*/
public Coordinator(Scenario scenario, ProcessFlow processFlow, File logDir) {
this.scenario = scenario;
this.processFlow = processFlow;
this.logDir = logDir;
this.store = new ScenarioStore(processFlow, logDir, scenario.name());
this.delegate = new Delegate();
this.latch = new CountDownLatch(store.getSteps().size());
}
/**
* Starts execution of the process flow graph.
*/
public void start() {
executeRoots(null);
}
/**
* Wants for completion of the entire process flow.
*
* @return exit code to use
* @throws InterruptedException if interrupted while waiting for completion
*/
public int waitFor() throws InterruptedException {
latch.await();
return store.hasFailures() ? 1 : 0;
}
/**
* Returns set of all test steps.
*
* @return set of steps
*/
public Set<Step> getSteps() {
return store.getSteps();
}
/**
* Returns the status of the specified test step.
*
* @param step test step or group
* @return step status
*/
public Status getStatus(Step step) {
return store.getStatus(step);
}
/**
* Adds the specified listener.
*
* @param listener step process listener
*/
public void addListener(StepProcessListener listener) {
listeners.add(checkNotNull(listener, "Listener cannot be null"));
}
/**
* Removes the specified listener.
*
* @param listener step process listener
*/
public void removeListener(StepProcessListener listener) {
listeners.remove(checkNotNull(listener, "Listener cannot be null"));
}
/**
* Executes the set of roots in the scope of the specified group or globally
* if no group is given.
*
* @param group optional group
*/
private void executeRoots(Group group) {
Set<Step> steps =
group != null ? group.children() : processFlow.getVertexes();
steps.forEach(step -> {
if (processFlow.getEdgesFrom(step).isEmpty() && step.group() == group) {
execute(step);
}
});
}
/**
* Executes the specified step.
*
* @param step step to execute
*/
private synchronized void execute(Step step) {
Directive directive = nextAction(step);
if (directive == RUN || directive == SKIP) {
store.updateStatus(step, IN_PROGRESS);
if (step instanceof Group) {
Group group = (Group) step;
delegate.onStart(group);
if (directive == RUN) {
executeRoots(group);
} else {
group.children().forEach(child -> delegate.onCompletion(child, 1));
}
} else {
executor.execute(new StepProcessor(step, directive == SKIP,
logDir, delegate));
}
}
}
/**
* Determines the state of the specified step.
*
* @param step test step
* @return state of the step process
*/
private Directive nextAction(Step step) {
Status status = store.getStatus(step);
if (status != WAITING) {
return NOOP;
}
for (Dependency dependency : processFlow.getEdgesFrom(step)) {
Status depStatus = store.getStatus(dependency.dst());
if (depStatus == WAITING || depStatus == IN_PROGRESS) {
return NOOP;
} else if (depStatus == FAILED && !dependency.isSoft()) {
return SKIP;
}
}
return RUN;
}
/**
* Executes the successors to the specified step.
*
* @param step step whose successors are to be executed
*/
private void executeSucessors(Step step) {
processFlow.getEdgesTo(step).forEach(dependency -> execute(dependency.src()));
completeParentIfNeeded(step.group());
}
/**
* Checks whether the specified parent group, if any, should be marked
* as complete.
*
* @param group parent group that should be checked
*/
private synchronized void completeParentIfNeeded(Group group) {
if (group != null && getStatus(group) == IN_PROGRESS) {
boolean done = true;
boolean failed = false;
for (Step child : group.children()) {
Status status = store.getStatus(child);
done = done && (status == SUCCEEDED || status == FAILED);
failed = failed || status == FAILED;
}
if (done) {
delegate.onCompletion(group, failed ? 1 : 0);
}
}
}
/**
* Prints formatted output.
*
* @param format printf format string
* @param args arguments to be printed
*/
public static void print(String format, Object... args) {
System.out.println(String.format(format, args));
}
/**
* Internal delegate to monitor the process execution.
*/
private class Delegate implements StepProcessListener {
@Override
public void onStart(Step step) {
listeners.forEach(listener -> listener.onStart(step));
}
@Override
public void onCompletion(Step step, int exitCode) {
store.updateStatus(step, exitCode == 0 ? SUCCEEDED : FAILED);
listeners.forEach(listener -> listener.onCompletion(step, exitCode));
executeSucessors(step);
latch.countDown();
}
@Override
public void onOutput(Step step, String line) {
listeners.forEach(listener -> listener.onOutput(step, line));
}
}
}