blob: 4ffa4af01201de20bb7d2e7fcccedb05c99a1f07 [file] [log] [blame]
Thomas Vachuskaf9c84362015-04-15 11:20:45 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.stc;
17
18import com.google.common.collect.Sets;
19
20import java.io.File;
21import java.util.Set;
22import java.util.concurrent.CountDownLatch;
23import java.util.concurrent.ExecutorService;
24
25import static com.google.common.base.Preconditions.checkNotNull;
26import static java.util.concurrent.Executors.newFixedThreadPool;
27import static org.onlab.stc.Coordinator.Directive.*;
28import static org.onlab.stc.Coordinator.Status.*;
29
30/**
31 * Coordinates execution of a scenario process flow.
32 */
33public class Coordinator {
34
35 private static final int MAX_THREADS = 16;
36
37 private final ExecutorService executor = newFixedThreadPool(MAX_THREADS);
38
39 private final Scenario scenario;
40 private final ProcessFlow processFlow;
41
42 private final StepProcessListener delegate;
43 private final CountDownLatch latch;
44 private final ScenarioStore store;
45
46 private final Set<StepProcessListener> listeners = Sets.newConcurrentHashSet();
47 private File logDir;
48
49 /**
50 * Represents action to be taken on a test step.
51 */
52 public enum Directive {
53 NOOP, RUN, SKIP
54 }
55
56 /**
57 * Represents processor state.
58 */
59 public enum Status {
60 WAITING, IN_PROGRESS, SUCCEEDED, FAILED
61 }
62
63 /**
64 * Creates a process flow coordinator.
65 *
66 * @param scenario test scenario to coordinate
67 * @param processFlow process flow to coordinate
68 * @param logDir scenario log directory
69 */
70 public Coordinator(Scenario scenario, ProcessFlow processFlow, File logDir) {
71 this.scenario = scenario;
72 this.processFlow = processFlow;
73 this.logDir = logDir;
74 this.store = new ScenarioStore(processFlow, logDir, scenario.name());
75 this.delegate = new Delegate();
76 this.latch = new CountDownLatch(store.getSteps().size());
77 }
78
79 /**
80 * Starts execution of the process flow graph.
81 */
82 public void start() {
83 executeRoots(null);
84 }
85
86 /**
87 * Wants for completion of the entire process flow.
88 *
89 * @return exit code to use
90 * @throws InterruptedException if interrupted while waiting for completion
91 */
92 public int waitFor() throws InterruptedException {
93 latch.await();
94 return store.hasFailures() ? 1 : 0;
95 }
96
97 /**
98 * Returns set of all test steps.
99 *
100 * @return set of steps
101 */
102 public Set<Step> getSteps() {
103 return store.getSteps();
104 }
105
106 /**
107 * Returns the status of the specified test step.
108 *
109 * @param step test step or group
110 * @return step status
111 */
112 public Status getStatus(Step step) {
113 return store.getStatus(step);
114 }
115
116 /**
117 * Adds the specified listener.
118 *
119 * @param listener step process listener
120 */
121 public void addListener(StepProcessListener listener) {
122 listeners.add(checkNotNull(listener, "Listener cannot be null"));
123 }
124
125 /**
126 * Removes the specified listener.
127 *
128 * @param listener step process listener
129 */
130 public void removeListener(StepProcessListener listener) {
131 listeners.remove(checkNotNull(listener, "Listener cannot be null"));
132 }
133
134 /**
135 * Executes the set of roots in the scope of the specified group or globally
136 * if no group is given.
137 *
138 * @param group optional group
139 */
140 private void executeRoots(Group group) {
141 Set<Step> steps =
142 group != null ? group.children() : processFlow.getVertexes();
143 steps.forEach(step -> {
144 if (processFlow.getEdgesFrom(step).isEmpty() && step.group() == group) {
145 execute(step);
146 }
147 });
148 }
149
150 /**
151 * Executes the specified step.
152 *
153 * @param step step to execute
154 */
155 private synchronized void execute(Step step) {
156 Directive directive = nextAction(step);
157 if (directive == RUN || directive == SKIP) {
158 store.updateStatus(step, IN_PROGRESS);
159 if (step instanceof Group) {
160 Group group = (Group) step;
161 delegate.onStart(group);
162 if (directive == RUN) {
163 executeRoots(group);
164 } else {
165 group.children().forEach(child -> delegate.onCompletion(child, 1));
166 }
167 } else {
168 executor.execute(new StepProcessor(step, directive == SKIP,
169 logDir, delegate));
170 }
171 }
172 }
173
174 /**
175 * Determines the state of the specified step.
176 *
177 * @param step test step
178 * @return state of the step process
179 */
180 private Directive nextAction(Step step) {
181 Status status = store.getStatus(step);
182 if (status != WAITING) {
183 return NOOP;
184 }
185
186 for (Dependency dependency : processFlow.getEdgesFrom(step)) {
187 Status depStatus = store.getStatus(dependency.dst());
188 if (depStatus == WAITING || depStatus == IN_PROGRESS) {
189 return NOOP;
190 } else if (depStatus == FAILED && !dependency.isSoft()) {
191 return SKIP;
192 }
193 }
194 return RUN;
195 }
196
197 /**
198 * Executes the successors to the specified step.
199 *
200 * @param step step whose successors are to be executed
201 */
202 private void executeSucessors(Step step) {
203 processFlow.getEdgesTo(step).forEach(dependency -> execute(dependency.src()));
204 completeParentIfNeeded(step.group());
205 }
206
207 /**
208 * Checks whether the specified parent group, if any, should be marked
209 * as complete.
210 *
211 * @param group parent group that should be checked
212 */
213 private synchronized void completeParentIfNeeded(Group group) {
214 if (group != null && getStatus(group) == IN_PROGRESS) {
215 boolean done = true;
216 boolean failed = false;
217 for (Step child : group.children()) {
218 Status status = store.getStatus(child);
219 done = done && (status == SUCCEEDED || status == FAILED);
220 failed = failed || status == FAILED;
221 }
222 if (done) {
223 delegate.onCompletion(group, failed ? 1 : 0);
224 }
225 }
226 }
227
228 /**
229 * Prints formatted output.
230 *
231 * @param format printf format string
232 * @param args arguments to be printed
233 */
234 public static void print(String format, Object... args) {
235 System.out.println(String.format(format, args));
236 }
237
238 /**
239 * Internal delegate to monitor the process execution.
240 */
241 private class Delegate implements StepProcessListener {
242
243 @Override
244 public void onStart(Step step) {
245 listeners.forEach(listener -> listener.onStart(step));
246 }
247
248 @Override
249 public void onCompletion(Step step, int exitCode) {
250 store.updateStatus(step, exitCode == 0 ? SUCCEEDED : FAILED);
251 listeners.forEach(listener -> listener.onCompletion(step, exitCode));
252 executeSucessors(step);
253 latch.countDown();
254 }
255
256 @Override
257 public void onOutput(Step step, String line) {
258 listeners.forEach(listener -> listener.onOutput(step, line));
259 }
260
261 }
262
263}