blob: 209de8491fdab97999c4aead0ff9753706357570 [file] [log] [blame]
Thomas Vachuskaf9c84362015-04-15 11:20:45 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.stc;
17
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070018import com.google.common.collect.ImmutableList;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070019import com.google.common.collect.Sets;
20
21import java.io.File;
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070022import java.util.List;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070023import java.util.Set;
24import java.util.concurrent.CountDownLatch;
25import java.util.concurrent.ExecutorService;
26
27import static com.google.common.base.Preconditions.checkNotNull;
28import static java.util.concurrent.Executors.newFixedThreadPool;
29import static org.onlab.stc.Coordinator.Directive.*;
30import static org.onlab.stc.Coordinator.Status.*;
31
32/**
33 * Coordinates execution of a scenario process flow.
34 */
35public class Coordinator {
36
37 private static final int MAX_THREADS = 16;
38
39 private final ExecutorService executor = newFixedThreadPool(MAX_THREADS);
40
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070041 private final ProcessFlow processFlow;
42
43 private final StepProcessListener delegate;
44 private final CountDownLatch latch;
45 private final ScenarioStore store;
46
47 private final Set<StepProcessListener> listeners = Sets.newConcurrentHashSet();
48 private File logDir;
49
50 /**
51 * Represents action to be taken on a test step.
52 */
53 public enum Directive {
54 NOOP, RUN, SKIP
55 }
56
57 /**
58 * Represents processor state.
59 */
60 public enum Status {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070061 WAITING, IN_PROGRESS, SUCCEEDED, FAILED, SKIPPED
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070062 }
63
64 /**
65 * Creates a process flow coordinator.
66 *
67 * @param scenario test scenario to coordinate
68 * @param processFlow process flow to coordinate
69 * @param logDir scenario log directory
70 */
71 public Coordinator(Scenario scenario, ProcessFlow processFlow, File logDir) {
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070072 this.processFlow = processFlow;
73 this.logDir = logDir;
74 this.store = new ScenarioStore(processFlow, logDir, scenario.name());
75 this.delegate = new Delegate();
76 this.latch = new CountDownLatch(store.getSteps().size());
77 }
78
79 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070080 * Resets any previously accrued status and events.
81 */
82 public void reset() {
83 store.reset();
84 }
85
86 /**
87 * Resets all previously accrued status and events for steps that lie
88 * in the range between the steps or groups whose names match the specified
89 * patterns.
90 *
91 * @param runFromPatterns list of starting step patterns
92 * @param runToPatterns list of ending step patterns
93 */
94 public void reset(List<String> runFromPatterns, List<String> runToPatterns) {
95 List<Step> fromSteps = matchSteps(runFromPatterns);
96 List<Step> toSteps = matchSteps(runToPatterns);
97
98 // FIXME: implement this
99 }
100
101 /**
102 * Returns a list of steps that match the specified list of patterns.
103 *
104 * @param runToPatterns list of patterns
105 * @return list of steps with matching names
106 */
107 private List<Step> matchSteps(List<String> runToPatterns) {
108 ImmutableList.Builder<Step> builder = ImmutableList.builder();
109 store.getSteps().forEach(step -> {
110 runToPatterns.forEach(p -> {
111 if (step.name().matches(p)) {
112 builder.add(step);
113 }
114 });
115 });
116 return builder.build();
117 }
118
119 /**
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700120 * Starts execution of the process flow graph.
121 */
122 public void start() {
123 executeRoots(null);
124 }
125
126 /**
127 * Wants for completion of the entire process flow.
128 *
129 * @return exit code to use
130 * @throws InterruptedException if interrupted while waiting for completion
131 */
132 public int waitFor() throws InterruptedException {
133 latch.await();
134 return store.hasFailures() ? 1 : 0;
135 }
136
137 /**
138 * Returns set of all test steps.
139 *
140 * @return set of steps
141 */
142 public Set<Step> getSteps() {
143 return store.getSteps();
144 }
145
146 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700147 * Returns a chronological list of step or group records.
148 *
149 * @return list of events
150 */
151 List<StepEvent> getRecords() {
152 return store.getEvents();
153 }
154
155 /**
156 * Returns the status record of the specified test step.
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700157 *
158 * @param step test step or group
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700159 * @return step status record
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700160 */
161 public Status getStatus(Step step) {
162 return store.getStatus(step);
163 }
164
165 /**
166 * Adds the specified listener.
167 *
168 * @param listener step process listener
169 */
170 public void addListener(StepProcessListener listener) {
171 listeners.add(checkNotNull(listener, "Listener cannot be null"));
172 }
173
174 /**
175 * Removes the specified listener.
176 *
177 * @param listener step process listener
178 */
179 public void removeListener(StepProcessListener listener) {
180 listeners.remove(checkNotNull(listener, "Listener cannot be null"));
181 }
182
183 /**
184 * Executes the set of roots in the scope of the specified group or globally
185 * if no group is given.
186 *
187 * @param group optional group
188 */
189 private void executeRoots(Group group) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700190 // FIXME: add ability to skip past completed steps
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700191 Set<Step> steps =
192 group != null ? group.children() : processFlow.getVertexes();
193 steps.forEach(step -> {
194 if (processFlow.getEdgesFrom(step).isEmpty() && step.group() == group) {
195 execute(step);
196 }
197 });
198 }
199
200 /**
201 * Executes the specified step.
202 *
203 * @param step step to execute
204 */
205 private synchronized void execute(Step step) {
206 Directive directive = nextAction(step);
207 if (directive == RUN || directive == SKIP) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700208 store.markStarted(step);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700209 if (step instanceof Group) {
210 Group group = (Group) step;
211 delegate.onStart(group);
212 if (directive == RUN) {
213 executeRoots(group);
214 } else {
215 group.children().forEach(child -> delegate.onCompletion(child, 1));
216 }
217 } else {
218 executor.execute(new StepProcessor(step, directive == SKIP,
219 logDir, delegate));
220 }
221 }
222 }
223
224 /**
225 * Determines the state of the specified step.
226 *
227 * @param step test step
228 * @return state of the step process
229 */
230 private Directive nextAction(Step step) {
231 Status status = store.getStatus(step);
232 if (status != WAITING) {
233 return NOOP;
234 }
235
236 for (Dependency dependency : processFlow.getEdgesFrom(step)) {
237 Status depStatus = store.getStatus(dependency.dst());
238 if (depStatus == WAITING || depStatus == IN_PROGRESS) {
239 return NOOP;
240 } else if (depStatus == FAILED && !dependency.isSoft()) {
241 return SKIP;
242 }
243 }
244 return RUN;
245 }
246
247 /**
248 * Executes the successors to the specified step.
249 *
250 * @param step step whose successors are to be executed
251 */
252 private void executeSucessors(Step step) {
253 processFlow.getEdgesTo(step).forEach(dependency -> execute(dependency.src()));
254 completeParentIfNeeded(step.group());
255 }
256
257 /**
258 * Checks whether the specified parent group, if any, should be marked
259 * as complete.
260 *
261 * @param group parent group that should be checked
262 */
263 private synchronized void completeParentIfNeeded(Group group) {
264 if (group != null && getStatus(group) == IN_PROGRESS) {
265 boolean done = true;
266 boolean failed = false;
267 for (Step child : group.children()) {
268 Status status = store.getStatus(child);
269 done = done && (status == SUCCEEDED || status == FAILED);
270 failed = failed || status == FAILED;
271 }
272 if (done) {
273 delegate.onCompletion(group, failed ? 1 : 0);
274 }
275 }
276 }
277
278 /**
279 * Prints formatted output.
280 *
281 * @param format printf format string
282 * @param args arguments to be printed
283 */
284 public static void print(String format, Object... args) {
285 System.out.println(String.format(format, args));
286 }
287
288 /**
289 * Internal delegate to monitor the process execution.
290 */
291 private class Delegate implements StepProcessListener {
292
293 @Override
294 public void onStart(Step step) {
295 listeners.forEach(listener -> listener.onStart(step));
296 }
297
298 @Override
299 public void onCompletion(Step step, int exitCode) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700300 store.markComplete(step, exitCode == 0 ? SUCCEEDED : FAILED);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700301 listeners.forEach(listener -> listener.onCompletion(step, exitCode));
302 executeSucessors(step);
303 latch.countDown();
304 }
305
306 @Override
307 public void onOutput(Step step, String line) {
308 listeners.forEach(listener -> listener.onOutput(step, line));
309 }
310
311 }
312
313}