blob: a26e9feb857e875df6e5ad0421197eb4516b5126 [file] [log] [blame]
Thomas Vachuskaf9c84362015-04-15 11:20:45 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.stc;
17
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070018import com.google.common.collect.ImmutableList;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070019import com.google.common.collect.Sets;
20
21import java.io.File;
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070022import java.util.List;
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070023import java.util.Set;
24import java.util.concurrent.CountDownLatch;
25import java.util.concurrent.ExecutorService;
26
27import static com.google.common.base.Preconditions.checkNotNull;
28import static java.util.concurrent.Executors.newFixedThreadPool;
29import static org.onlab.stc.Coordinator.Directive.*;
30import static org.onlab.stc.Coordinator.Status.*;
31
32/**
33 * Coordinates execution of a scenario process flow.
34 */
35public class Coordinator {
36
37 private static final int MAX_THREADS = 16;
38
39 private final ExecutorService executor = newFixedThreadPool(MAX_THREADS);
40
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070041 private final ProcessFlow processFlow;
42
43 private final StepProcessListener delegate;
44 private final CountDownLatch latch;
45 private final ScenarioStore store;
46
47 private final Set<StepProcessListener> listeners = Sets.newConcurrentHashSet();
48 private File logDir;
49
50 /**
51 * Represents action to be taken on a test step.
52 */
53 public enum Directive {
54 NOOP, RUN, SKIP
55 }
56
57 /**
58 * Represents processor state.
59 */
60 public enum Status {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070061 WAITING, IN_PROGRESS, SUCCEEDED, FAILED, SKIPPED
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070062 }
63
64 /**
65 * Creates a process flow coordinator.
66 *
67 * @param scenario test scenario to coordinate
68 * @param processFlow process flow to coordinate
69 * @param logDir scenario log directory
70 */
71 public Coordinator(Scenario scenario, ProcessFlow processFlow, File logDir) {
Thomas Vachuskaf9c84362015-04-15 11:20:45 -070072 this.processFlow = processFlow;
73 this.logDir = logDir;
74 this.store = new ScenarioStore(processFlow, logDir, scenario.name());
75 this.delegate = new Delegate();
76 this.latch = new CountDownLatch(store.getSteps().size());
77 }
78
79 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -070080 * Resets any previously accrued status and events.
81 */
82 public void reset() {
83 store.reset();
84 }
85
86 /**
87 * Resets all previously accrued status and events for steps that lie
88 * in the range between the steps or groups whose names match the specified
89 * patterns.
90 *
91 * @param runFromPatterns list of starting step patterns
92 * @param runToPatterns list of ending step patterns
93 */
94 public void reset(List<String> runFromPatterns, List<String> runToPatterns) {
95 List<Step> fromSteps = matchSteps(runFromPatterns);
96 List<Step> toSteps = matchSteps(runToPatterns);
97
98 // FIXME: implement this
99 }
100
101 /**
102 * Returns a list of steps that match the specified list of patterns.
103 *
104 * @param runToPatterns list of patterns
105 * @return list of steps with matching names
106 */
107 private List<Step> matchSteps(List<String> runToPatterns) {
108 ImmutableList.Builder<Step> builder = ImmutableList.builder();
109 store.getSteps().forEach(step -> {
110 runToPatterns.forEach(p -> {
111 if (step.name().matches(p)) {
112 builder.add(step);
113 }
114 });
115 });
116 return builder.build();
117 }
118
119 /**
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700120 * Starts execution of the process flow graph.
121 */
122 public void start() {
123 executeRoots(null);
124 }
125
126 /**
127 * Wants for completion of the entire process flow.
128 *
129 * @return exit code to use
130 * @throws InterruptedException if interrupted while waiting for completion
131 */
132 public int waitFor() throws InterruptedException {
133 latch.await();
134 return store.hasFailures() ? 1 : 0;
135 }
136
137 /**
138 * Returns set of all test steps.
139 *
140 * @return set of steps
141 */
142 public Set<Step> getSteps() {
143 return store.getSteps();
144 }
145
146 /**
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700147 * Returns a chronological list of step or group records.
148 *
149 * @return list of events
150 */
151 List<StepEvent> getRecords() {
152 return store.getEvents();
153 }
154
155 /**
156 * Returns the status record of the specified test step.
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700157 *
158 * @param step test step or group
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700159 * @return step status record
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700160 */
161 public Status getStatus(Step step) {
162 return store.getStatus(step);
163 }
164
165 /**
166 * Adds the specified listener.
167 *
168 * @param listener step process listener
169 */
170 public void addListener(StepProcessListener listener) {
171 listeners.add(checkNotNull(listener, "Listener cannot be null"));
172 }
173
174 /**
175 * Removes the specified listener.
176 *
177 * @param listener step process listener
178 */
179 public void removeListener(StepProcessListener listener) {
180 listeners.remove(checkNotNull(listener, "Listener cannot be null"));
181 }
182
183 /**
184 * Executes the set of roots in the scope of the specified group or globally
185 * if no group is given.
186 *
187 * @param group optional group
188 */
189 private void executeRoots(Group group) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700190 // FIXME: add ability to skip past completed steps
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700191 Set<Step> steps =
192 group != null ? group.children() : processFlow.getVertexes();
193 steps.forEach(step -> {
194 if (processFlow.getEdgesFrom(step).isEmpty() && step.group() == group) {
195 execute(step);
196 }
197 });
198 }
199
200 /**
201 * Executes the specified step.
202 *
203 * @param step step to execute
204 */
205 private synchronized void execute(Step step) {
206 Directive directive = nextAction(step);
Thomas Vachuska86439372015-06-05 09:21:32 -0700207 if (directive == RUN) {
Thomas Vachuska50ec1af2015-06-02 00:42:52 -0700208 store.markStarted(step);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700209 if (step instanceof Group) {
210 Group group = (Group) step;
211 delegate.onStart(group);
Thomas Vachuska86439372015-06-05 09:21:32 -0700212 executeRoots(group);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700213 } else {
Thomas Vachuska86439372015-06-05 09:21:32 -0700214 executor.execute(new StepProcessor(step, logDir, delegate));
215 }
216 } else if (directive == SKIP) {
217 if (step instanceof Group) {
218 Group group = (Group) step;
219 group.children().forEach(child -> delegate.onCompletion(child, SKIPPED));
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700220 }
Thomas Vachuskaa0b50dd2015-06-09 09:55:02 -0700221 delegate.onCompletion(step, SKIPPED);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700222 }
223 }
224
225 /**
226 * Determines the state of the specified step.
227 *
228 * @param step test step
229 * @return state of the step process
230 */
231 private Directive nextAction(Step step) {
232 Status status = store.getStatus(step);
233 if (status != WAITING) {
234 return NOOP;
235 }
236
237 for (Dependency dependency : processFlow.getEdgesFrom(step)) {
238 Status depStatus = store.getStatus(dependency.dst());
239 if (depStatus == WAITING || depStatus == IN_PROGRESS) {
240 return NOOP;
Thomas Vachuska86439372015-06-05 09:21:32 -0700241 } else if ((depStatus == FAILED || depStatus == SKIPPED) &&
242 !dependency.isSoft()) {
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700243 return SKIP;
244 }
245 }
246 return RUN;
247 }
248
249 /**
250 * Executes the successors to the specified step.
251 *
252 * @param step step whose successors are to be executed
253 */
254 private void executeSucessors(Step step) {
255 processFlow.getEdgesTo(step).forEach(dependency -> execute(dependency.src()));
256 completeParentIfNeeded(step.group());
257 }
258
259 /**
260 * Checks whether the specified parent group, if any, should be marked
261 * as complete.
262 *
263 * @param group parent group that should be checked
264 */
265 private synchronized void completeParentIfNeeded(Group group) {
266 if (group != null && getStatus(group) == IN_PROGRESS) {
267 boolean done = true;
268 boolean failed = false;
269 for (Step child : group.children()) {
270 Status status = store.getStatus(child);
Thomas Vachuskaa0b50dd2015-06-09 09:55:02 -0700271 done = done && (status == SUCCEEDED || status == FAILED || status == SKIPPED);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700272 failed = failed || status == FAILED;
273 }
274 if (done) {
Thomas Vachuska86439372015-06-05 09:21:32 -0700275 delegate.onCompletion(group, failed ? FAILED : SUCCEEDED);
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700276 }
277 }
278 }
279
280 /**
281 * Prints formatted output.
282 *
283 * @param format printf format string
284 * @param args arguments to be printed
285 */
286 public static void print(String format, Object... args) {
287 System.out.println(String.format(format, args));
288 }
289
290 /**
291 * Internal delegate to monitor the process execution.
292 */
293 private class Delegate implements StepProcessListener {
294
295 @Override
296 public void onStart(Step step) {
297 listeners.forEach(listener -> listener.onStart(step));
298 }
299
300 @Override
Thomas Vachuska86439372015-06-05 09:21:32 -0700301 public void onCompletion(Step step, Status status) {
302 store.markComplete(step, status);
303 listeners.forEach(listener -> listener.onCompletion(step, status));
Thomas Vachuskaf9c84362015-04-15 11:20:45 -0700304 executeSucessors(step);
305 latch.countDown();
306 }
307
308 @Override
309 public void onOutput(Step step, String line) {
310 listeners.forEach(listener -> listener.onOutput(step, line));
311 }
312
313 }
314
315}