blob: 1c59b9f664b496519a10c13fe7eeb181c0f639ea [file] [log] [blame]
Thomas Vachuskaf9c84362015-04-15 11:20:45 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Thomas Vachuskaf9c84362015-04-15 11:20:45 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.stc;
17
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070018import com.google.common.collect.ImmutableList;
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070019import com.google.common.collect.Maps;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070020import com.google.common.collect.Sets;
21
22import java.io.File;
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070023import java.util.List;
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070024import java.util.Map;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070025import java.util.Set;
26import java.util.concurrent.CountDownLatch;
27import java.util.concurrent.ExecutorService;
Thomas Vachuska54316c22015-11-04 14:02:02 -080028import java.util.concurrent.TimeUnit;
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070029import java.util.regex.Matcher;
30import java.util.regex.Pattern;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070031
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070032import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070033import static com.google.common.base.Preconditions.checkNotNull;
34import static java.util.concurrent.Executors.newFixedThreadPool;
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070035import static org.onlab.stc.Compiler.PROP_END;
36import static org.onlab.stc.Compiler.PROP_START;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070037import static org.onlab.stc.Coordinator.Directive.*;
38import static org.onlab.stc.Coordinator.Status.*;
39
40/**
41 * Coordinates execution of a scenario process flow.
42 */
43public class Coordinator {
44
Thomas Vachuska54316c22015-11-04 14:02:02 -080045 private static final int MAX_THREADS = 64;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070046
47 private final ExecutorService executor = newFixedThreadPool(MAX_THREADS);
48
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070049 private final ProcessFlow processFlow;
50
51 private final StepProcessListener delegate;
52 private final CountDownLatch latch;
53 private final ScenarioStore store;
54
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070055 private static final Pattern PROP_ERE = Pattern.compile("^@stc ([a-zA-Z0-9_.]+)=(.*$)");
56 private final Map<String, String> properties = Maps.newConcurrentMap();
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -070057
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070058 private final Set<StepProcessListener> listeners = Sets.newConcurrentHashSet();
59 private File logDir;
60
61 /**
62 * Represents action to be taken on a test step.
63 */
64 public enum Directive {
65 NOOP, RUN, SKIP
66 }
67
68 /**
69 * Represents processor state.
70 */
71 public enum Status {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070072 WAITING, IN_PROGRESS, SUCCEEDED, FAILED, SKIPPED
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070073 }
74
75 /**
76 * Creates a process flow coordinator.
77 *
78 * @param scenario test scenario to coordinate
79 * @param processFlow process flow to coordinate
80 * @param logDir scenario log directory
81 */
82 public Coordinator(Scenario scenario, ProcessFlow processFlow, File logDir) {
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070083 this.processFlow = processFlow;
84 this.logDir = logDir;
85 this.store = new ScenarioStore(processFlow, logDir, scenario.name());
86 this.delegate = new Delegate();
Thomas Vachuska1b403a52015-08-26 11:30:48 -070087 this.latch = new CountDownLatch(1);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070088 }
89
90 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070091 * Resets any previously accrued status and events.
92 */
93 public void reset() {
94 store.reset();
95 }
96
97 /**
98 * Resets all previously accrued status and events for steps that lie
99 * in the range between the steps or groups whose names match the specified
100 * patterns.
101 *
102 * @param runFromPatterns list of starting step patterns
103 * @param runToPatterns list of ending step patterns
104 */
105 public void reset(List<String> runFromPatterns, List<String> runToPatterns) {
106 List<Step> fromSteps = matchSteps(runFromPatterns);
107 List<Step> toSteps = matchSteps(runToPatterns);
108
109 // FIXME: implement this
110 }
111
112 /**
Thomas Vachuskad542cc42015-09-11 16:15:36 -0700113 * Returns number of milliseconds it took to execute.
114 *
115 * @return number of millis elapsed during the run
116 */
117 public long duration() {
118 return store.endTime() - store.startTime();
119 }
120
121 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700122 * Returns a list of steps that match the specified list of patterns.
123 *
124 * @param runToPatterns list of patterns
125 * @return list of steps with matching names
126 */
127 private List<Step> matchSteps(List<String> runToPatterns) {
128 ImmutableList.Builder<Step> builder = ImmutableList.builder();
129 store.getSteps().forEach(step -> {
130 runToPatterns.forEach(p -> {
131 if (step.name().matches(p)) {
132 builder.add(step);
133 }
134 });
135 });
136 return builder.build();
137 }
138
139 /**
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700140 * Starts execution of the process flow graph.
141 */
142 public void start() {
143 executeRoots(null);
144 }
145
146 /**
Thomas Vachuska54316c22015-11-04 14:02:02 -0800147 * Waits for completion of the entire process flow.
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700148 *
149 * @return exit code to use
150 * @throws InterruptedException if interrupted while waiting for completion
151 */
152 public int waitFor() throws InterruptedException {
Thomas Vachuska54316c22015-11-04 14:02:02 -0800153 while (!store.isComplete()) {
154 latch.await(1, TimeUnit.SECONDS);
155 }
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700156 return store.hasFailures() ? 1 : 0;
157 }
158
159 /**
160 * Returns set of all test steps.
161 *
162 * @return set of steps
163 */
164 public Set<Step> getSteps() {
165 return store.getSteps();
166 }
167
168 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700169 * Returns a chronological list of step or group records.
170 *
171 * @return list of events
172 */
173 List<StepEvent> getRecords() {
174 return store.getEvents();
175 }
176
177 /**
178 * Returns the status record of the specified test step.
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700179 *
180 * @param step test step or group
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700181 * @return step status record
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700182 */
183 public Status getStatus(Step step) {
184 return store.getStatus(step);
185 }
186
187 /**
188 * Adds the specified listener.
189 *
190 * @param listener step process listener
191 */
192 public void addListener(StepProcessListener listener) {
193 listeners.add(checkNotNull(listener, "Listener cannot be null"));
194 }
195
196 /**
197 * Removes the specified listener.
198 *
199 * @param listener step process listener
200 */
201 public void removeListener(StepProcessListener listener) {
202 listeners.remove(checkNotNull(listener, "Listener cannot be null"));
203 }
204
205 /**
206 * Executes the set of roots in the scope of the specified group or globally
207 * if no group is given.
208 *
209 * @param group optional group
210 */
211 private void executeRoots(Group group) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700212 // FIXME: add ability to skip past completed steps
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700213 Set<Step> steps =
214 group != null ? group.children() : processFlow.getVertexes();
215 steps.forEach(step -> {
216 if (processFlow.getEdgesFrom(step).isEmpty() && step.group() == group) {
217 execute(step);
218 }
219 });
220 }
221
222 /**
223 * Executes the specified step.
224 *
225 * @param step step to execute
226 */
227 private synchronized void execute(Step step) {
228 Directive directive = nextAction(step);
Thomas Vachuska86439372015-06-05 09:21:32 -0700229 if (directive == RUN) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700230 store.markStarted(step);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700231 if (step instanceof Group) {
232 Group group = (Group) step;
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -0700233 delegate.onStart(group, null);
Thomas Vachuska86439372015-06-05 09:21:32 -0700234 executeRoots(group);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700235 } else {
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -0700236 executor.execute(new StepProcessor(step, logDir, delegate,
Thomas Vachuskae2de8ee2015-08-25 16:14:28 -0700237 substitute(step.command())));
Thomas Vachuska86439372015-06-05 09:21:32 -0700238 }
239 } else if (directive == SKIP) {
Thomas Vachuska54316c22015-11-04 14:02:02 -0800240 skipStep(step);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700241 }
242 }
243
244 /**
Thomas Vachuska54316c22015-11-04 14:02:02 -0800245 * Recursively skips the specified step or group and any steps/groups within.
246 *
247 * @param step step or group
248 */
249 private void skipStep(Step step) {
250 if (step instanceof Group) {
251 Group group = (Group) step;
252 store.markComplete(step, SKIPPED);
253 group.children().forEach(this::skipStep);
254 }
255 delegate.onCompletion(step, SKIPPED);
256
257 }
258
259 /**
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700260 * Determines the state of the specified step.
261 *
262 * @param step test step
263 * @return state of the step process
264 */
265 private Directive nextAction(Step step) {
266 Status status = store.getStatus(step);
267 if (status != WAITING) {
268 return NOOP;
269 }
270
271 for (Dependency dependency : processFlow.getEdgesFrom(step)) {
272 Status depStatus = store.getStatus(dependency.dst());
273 if (depStatus == WAITING || depStatus == IN_PROGRESS) {
274 return NOOP;
Thomas Vachuska54316c22015-11-04 14:02:02 -0800275 } else if (((depStatus == FAILED || depStatus == SKIPPED) && !dependency.isSoft()) ||
276 (step.group() != null && store.getStatus(step.group()) == SKIPPED)) {
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700277 return SKIP;
278 }
279 }
280 return RUN;
281 }
282
283 /**
284 * Executes the successors to the specified step.
285 *
286 * @param step step whose successors are to be executed
287 */
288 private void executeSucessors(Step step) {
289 processFlow.getEdgesTo(step).forEach(dependency -> execute(dependency.src()));
290 completeParentIfNeeded(step.group());
291 }
292
293 /**
294 * Checks whether the specified parent group, if any, should be marked
295 * as complete.
296 *
297 * @param group parent group that should be checked
298 */
299 private synchronized void completeParentIfNeeded(Group group) {
300 if (group != null && getStatus(group) == IN_PROGRESS) {
301 boolean done = true;
302 boolean failed = false;
303 for (Step child : group.children()) {
304 Status status = store.getStatus(child);
Thomas Vachuskaa0b50dd2015-06-09 09:55:02 -0700305 done = done && (status == SUCCEEDED || status == FAILED || status == SKIPPED);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700306 failed = failed || status == FAILED;
307 }
308 if (done) {
Thomas Vachuska86439372015-06-05 09:21:32 -0700309 delegate.onCompletion(group, failed ? FAILED : SUCCEEDED);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700310 }
311 }
312 }
313
314 /**
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -0700315 * Expands the var references with values from the properties map.
316 *
317 * @param string string to perform substitutions on
318 */
319 private String substitute(String string) {
320 StringBuilder sb = new StringBuilder();
321 int start, end, last = 0;
322 while ((start = string.indexOf(PROP_START, last)) >= 0) {
323 end = string.indexOf(PROP_END, start + PROP_START.length());
324 checkArgument(end > start, "Malformed property in %s", string);
325 sb.append(string.substring(last, start));
326 String prop = string.substring(start + PROP_START.length(), end);
327 String value = properties.get(prop);
328 sb.append(value != null ? value : "");
329 last = end + 1;
330 }
331 sb.append(string.substring(last));
332 return sb.toString().replace('\n', ' ').replace('\r', ' ');
333 }
334
335 /**
336 * Scrapes the line of output for any variables to be captured and posted
337 * in the properties for later use.
338 *
339 * @param line line of output to scrape for property exports
340 */
341 private void scrapeForVariables(String line) {
342 Matcher matcher = PROP_ERE.matcher(line);
343 if (matcher.matches()) {
344 String prop = matcher.group(1);
345 String value = matcher.group(2);
346 properties.put(prop, value);
347 }
348 }
349
350
351 /**
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700352 * Prints formatted output.
353 *
354 * @param format printf format string
355 * @param args arguments to be printed
356 */
357 public static void print(String format, Object... args) {
358 System.out.println(String.format(format, args));
359 }
360
361 /**
362 * Internal delegate to monitor the process execution.
363 */
364 private class Delegate implements StepProcessListener {
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700365 @Override
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -0700366 public void onStart(Step step, String command) {
367 listeners.forEach(listener -> listener.onStart(step, command));
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700368 }
369
370 @Override
Thomas Vachuska86439372015-06-05 09:21:32 -0700371 public void onCompletion(Step step, Status status) {
372 store.markComplete(step, status);
373 listeners.forEach(listener -> listener.onCompletion(step, status));
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700374 executeSucessors(step);
Thomas Vachuska1b403a52015-08-26 11:30:48 -0700375 if (store.isComplete()) {
376 latch.countDown();
377 }
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700378 }
379
380 @Override
381 public void onOutput(Step step, String line) {
Thomas Vachuskab51b8bc2015-07-27 08:37:12 -0700382 scrapeForVariables(line);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700383 listeners.forEach(listener -> listener.onOutput(step, line));
384 }
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700385 }
386
387}