| /* |
| * Copyright 2015 Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onlab.stc; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import java.io.File; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static java.util.concurrent.Executors.newFixedThreadPool; |
| import static org.onlab.stc.Compiler.PROP_END; |
| import static org.onlab.stc.Compiler.PROP_START; |
| import static org.onlab.stc.Coordinator.Directive.*; |
| import static org.onlab.stc.Coordinator.Status.*; |
| |
| /** |
| * Coordinates execution of a scenario process flow. |
| */ |
| public class Coordinator { |
| |
| private static final int MAX_THREADS = 64; |
| |
| private final ExecutorService executor = newFixedThreadPool(MAX_THREADS); |
| |
| private final ProcessFlow processFlow; |
| |
| private final StepProcessListener delegate; |
| private final CountDownLatch latch; |
| private final ScenarioStore store; |
| |
| private static final Pattern PROP_ERE = Pattern.compile("^@stc ([a-zA-Z0-9_.]+)=(.*$)"); |
| private final Map<String, String> properties = Maps.newConcurrentMap(); |
| |
| private final Set<StepProcessListener> listeners = Sets.newConcurrentHashSet(); |
| private File logDir; |
| |
| /** |
| * Represents action to be taken on a test step. |
| */ |
| public enum Directive { |
| NOOP, RUN, SKIP |
| } |
| |
| /** |
| * Represents processor state. |
| */ |
| public enum Status { |
| WAITING, IN_PROGRESS, SUCCEEDED, FAILED, SKIPPED |
| } |
| |
| /** |
| * Creates a process flow coordinator. |
| * |
| * @param scenario test scenario to coordinate |
| * @param processFlow process flow to coordinate |
| * @param logDir scenario log directory |
| */ |
| public Coordinator(Scenario scenario, ProcessFlow processFlow, File logDir) { |
| this.processFlow = processFlow; |
| this.logDir = logDir; |
| this.store = new ScenarioStore(processFlow, logDir, scenario.name()); |
| this.delegate = new Delegate(); |
| this.latch = new CountDownLatch(1); |
| } |
| |
| /** |
| * Resets any previously accrued status and events. |
| */ |
| public void reset() { |
| store.reset(); |
| } |
| |
| /** |
| * Resets all previously accrued status and events for steps that lie |
| * in the range between the steps or groups whose names match the specified |
| * patterns. |
| * |
| * @param runFromPatterns list of starting step patterns |
| * @param runToPatterns list of ending step patterns |
| */ |
| public void reset(List<String> runFromPatterns, List<String> runToPatterns) { |
| List<Step> fromSteps = matchSteps(runFromPatterns); |
| List<Step> toSteps = matchSteps(runToPatterns); |
| |
| // FIXME: implement this |
| } |
| |
| /** |
| * Returns number of milliseconds it took to execute. |
| * |
| * @return number of millis elapsed during the run |
| */ |
| public long duration() { |
| return store.endTime() - store.startTime(); |
| } |
| |
| /** |
| * Returns a list of steps that match the specified list of patterns. |
| * |
| * @param runToPatterns list of patterns |
| * @return list of steps with matching names |
| */ |
| private List<Step> matchSteps(List<String> runToPatterns) { |
| ImmutableList.Builder<Step> builder = ImmutableList.builder(); |
| store.getSteps().forEach(step -> { |
| runToPatterns.forEach(p -> { |
| if (step.name().matches(p)) { |
| builder.add(step); |
| } |
| }); |
| }); |
| return builder.build(); |
| } |
| |
| /** |
| * Starts execution of the process flow graph. |
| */ |
| public void start() { |
| executeRoots(null); |
| } |
| |
| /** |
| * Waits for completion of the entire process flow. |
| * |
| * @return exit code to use |
| * @throws InterruptedException if interrupted while waiting for completion |
| */ |
| public int waitFor() throws InterruptedException { |
| while (!store.isComplete()) { |
| latch.await(1, TimeUnit.SECONDS); |
| } |
| return store.hasFailures() ? 1 : 0; |
| } |
| |
| /** |
| * Returns set of all test steps. |
| * |
| * @return set of steps |
| */ |
| public Set<Step> getSteps() { |
| return store.getSteps(); |
| } |
| |
| /** |
| * Returns a chronological list of step or group records. |
| * |
| * @return list of events |
| */ |
| List<StepEvent> getRecords() { |
| return store.getEvents(); |
| } |
| |
| /** |
| * Returns the status record of the specified test step. |
| * |
| * @param step test step or group |
| * @return step status record |
| */ |
| public Status getStatus(Step step) { |
| return store.getStatus(step); |
| } |
| |
| /** |
| * Adds the specified listener. |
| * |
| * @param listener step process listener |
| */ |
| public void addListener(StepProcessListener listener) { |
| listeners.add(checkNotNull(listener, "Listener cannot be null")); |
| } |
| |
| /** |
| * Removes the specified listener. |
| * |
| * @param listener step process listener |
| */ |
| public void removeListener(StepProcessListener listener) { |
| listeners.remove(checkNotNull(listener, "Listener cannot be null")); |
| } |
| |
| /** |
| * Executes the set of roots in the scope of the specified group or globally |
| * if no group is given. |
| * |
| * @param group optional group |
| */ |
| private void executeRoots(Group group) { |
| // FIXME: add ability to skip past completed steps |
| Set<Step> steps = |
| group != null ? group.children() : processFlow.getVertexes(); |
| steps.forEach(step -> { |
| if (processFlow.getEdgesFrom(step).isEmpty() && step.group() == group) { |
| execute(step); |
| } |
| }); |
| } |
| |
| /** |
| * Executes the specified step. |
| * |
| * @param step step to execute |
| */ |
| private synchronized void execute(Step step) { |
| Directive directive = nextAction(step); |
| if (directive == RUN) { |
| store.markStarted(step); |
| if (step instanceof Group) { |
| Group group = (Group) step; |
| delegate.onStart(group, null); |
| executeRoots(group); |
| } else { |
| executor.execute(new StepProcessor(step, logDir, delegate, |
| substitute(step.command()))); |
| } |
| } else if (directive == SKIP) { |
| skipStep(step); |
| } |
| } |
| |
| /** |
| * Recursively skips the specified step or group and any steps/groups within. |
| * |
| * @param step step or group |
| */ |
| private void skipStep(Step step) { |
| if (step instanceof Group) { |
| Group group = (Group) step; |
| store.markComplete(step, SKIPPED); |
| group.children().forEach(this::skipStep); |
| } |
| delegate.onCompletion(step, SKIPPED); |
| |
| } |
| |
| /** |
| * Determines the state of the specified step. |
| * |
| * @param step test step |
| * @return state of the step process |
| */ |
| private Directive nextAction(Step step) { |
| Status status = store.getStatus(step); |
| if (status != WAITING) { |
| return NOOP; |
| } |
| |
| for (Dependency dependency : processFlow.getEdgesFrom(step)) { |
| Status depStatus = store.getStatus(dependency.dst()); |
| if (depStatus == WAITING || depStatus == IN_PROGRESS) { |
| return NOOP; |
| } else if (((depStatus == FAILED || depStatus == SKIPPED) && !dependency.isSoft()) || |
| (step.group() != null && store.getStatus(step.group()) == SKIPPED)) { |
| return SKIP; |
| } |
| } |
| return RUN; |
| } |
| |
| /** |
| * Executes the successors to the specified step. |
| * |
| * @param step step whose successors are to be executed |
| */ |
| private void executeSucessors(Step step) { |
| processFlow.getEdgesTo(step).forEach(dependency -> execute(dependency.src())); |
| completeParentIfNeeded(step.group()); |
| } |
| |
| /** |
| * Checks whether the specified parent group, if any, should be marked |
| * as complete. |
| * |
| * @param group parent group that should be checked |
| */ |
| private synchronized void completeParentIfNeeded(Group group) { |
| if (group != null && getStatus(group) == IN_PROGRESS) { |
| boolean done = true; |
| boolean failed = false; |
| for (Step child : group.children()) { |
| Status status = store.getStatus(child); |
| done = done && (status == SUCCEEDED || status == FAILED || status == SKIPPED); |
| failed = failed || status == FAILED; |
| } |
| if (done) { |
| delegate.onCompletion(group, failed ? FAILED : SUCCEEDED); |
| } |
| } |
| } |
| |
| /** |
| * Expands the var references with values from the properties map. |
| * |
| * @param string string to perform substitutions on |
| */ |
| private String substitute(String string) { |
| StringBuilder sb = new StringBuilder(); |
| int start, end, last = 0; |
| while ((start = string.indexOf(PROP_START, last)) >= 0) { |
| end = string.indexOf(PROP_END, start + PROP_START.length()); |
| checkArgument(end > start, "Malformed property in %s", string); |
| sb.append(string.substring(last, start)); |
| String prop = string.substring(start + PROP_START.length(), end); |
| String value = properties.get(prop); |
| sb.append(value != null ? value : ""); |
| last = end + 1; |
| } |
| sb.append(string.substring(last)); |
| return sb.toString().replace('\n', ' ').replace('\r', ' '); |
| } |
| |
| /** |
| * Scrapes the line of output for any variables to be captured and posted |
| * in the properties for later use. |
| * |
| * @param line line of output to scrape for property exports |
| */ |
| private void scrapeForVariables(String line) { |
| Matcher matcher = PROP_ERE.matcher(line); |
| if (matcher.matches()) { |
| String prop = matcher.group(1); |
| String value = matcher.group(2); |
| properties.put(prop, value); |
| } |
| } |
| |
| |
| /** |
| * Prints formatted output. |
| * |
| * @param format printf format string |
| * @param args arguments to be printed |
| */ |
| public static void print(String format, Object... args) { |
| System.out.println(String.format(format, args)); |
| } |
| |
| /** |
| * Internal delegate to monitor the process execution. |
| */ |
| private class Delegate implements StepProcessListener { |
| @Override |
| public void onStart(Step step, String command) { |
| listeners.forEach(listener -> listener.onStart(step, command)); |
| } |
| |
| @Override |
| public void onCompletion(Step step, Status status) { |
| store.markComplete(step, status); |
| listeners.forEach(listener -> listener.onCompletion(step, status)); |
| executeSucessors(step); |
| if (store.isComplete()) { |
| latch.countDown(); |
| } |
| } |
| |
| @Override |
| public void onOutput(Step step, String line) { |
| scrapeForVariables(line); |
| listeners.forEach(listener -> listener.onOutput(step, line)); |
| } |
| } |
| |
| } |