blob: 237f70edb699b46bbb0a98e4715aca5856579fcd [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
19import com.google.common.collect.Lists;
jaegonkim6a7b5242018-09-12 23:09:42 +090020import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
jaegonkime0f45b52018-10-09 20:23:26 +090025import org.onosproject.store.service.StorageException;
jaegonkim6a7b5242018-09-12 23:09:42 +090026import org.onosproject.workflow.api.DefaultWorkplace;
27import org.onosproject.workflow.api.EventHintSupplier;
28import org.onosproject.workflow.api.EventTask;
jaegonkime0f45b52018-10-09 20:23:26 +090029import org.onosproject.workflow.api.JsonDataModelInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090030import org.onosproject.workflow.api.JsonDataModelTree;
jaegonkime0f45b52018-10-09 20:23:26 +090031import org.onosproject.workflow.api.ProgramCounter;
jaegonkim6a7b5242018-09-12 23:09:42 +090032import org.onosproject.workflow.api.SystemWorkflowContext;
33import org.onosproject.workflow.api.EventTimeoutTask;
34import org.onosproject.workflow.api.TimeoutTask;
35import org.onosproject.workflow.api.TimerChain;
36import org.onosproject.workflow.api.Worklet;
37import org.onosproject.workflow.api.Workflow;
38import org.onosproject.workflow.api.WorkflowContext;
39import org.onosproject.workflow.api.WorkflowData;
40import org.onosproject.workflow.api.ContextEventMapStore;
41import org.onosproject.workflow.api.WorkflowState;
42import org.onosproject.workflow.api.WorkflowStore;
43import org.onosproject.workflow.api.WorkflowBatchDelegate;
44import org.onosproject.workflow.api.WorkflowDataEvent;
45import org.onosproject.workflow.api.WorkflowDataListener;
46import org.onosproject.workflow.api.WorkflowException;
47import org.onosproject.workflow.api.HandlerTask;
48import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
49import org.onosproject.workflow.api.Workplace;
50import org.onosproject.workflow.api.WorkplaceStore;
51import org.onosproject.workflow.api.WorkplaceStoreDelegate;
52import org.onosproject.workflow.api.WorkflowExecutionService;
53import org.onosproject.event.AbstractListenerManager;
54import org.onosproject.event.Event;
55import org.onosproject.net.intent.WorkPartitionService;
Ray Milkeydf521292018-10-04 15:13:33 -070056import org.osgi.service.component.annotations.Activate;
57import org.osgi.service.component.annotations.Component;
58import org.osgi.service.component.annotations.Deactivate;
59import org.osgi.service.component.annotations.Reference;
60import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090061import org.slf4j.Logger;
62
63import java.util.Collection;
64import java.util.List;
65import java.util.Map;
66import java.util.Objects;
67import java.util.UUID;
68import java.util.concurrent.CompletableFuture;
69import java.util.concurrent.ExecutorService;
70import java.util.concurrent.ScheduledExecutorService;
71import java.util.stream.Collectors;
72
73import static java.util.concurrent.Executors.newFixedThreadPool;
74import static java.util.concurrent.Executors.newSingleThreadExecutor;
75import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
76import static org.onlab.util.Tools.groupedThreads;
77import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
78import static org.slf4j.LoggerFactory.getLogger;
79
Ray Milkeydf521292018-10-04 15:13:33 -070080@Component(immediate = true, service = WorkflowExecutionService.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090081public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
82 implements WorkflowExecutionService {
83
84 protected static final Logger log = getLogger(WorkFlowEngine.class);
Ray Milkeydf521292018-10-04 15:13:33 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090086 protected CoreService coreService;
87
Ray Milkeydf521292018-10-04 15:13:33 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090089 protected ClusterService clusterService;
90
Ray Milkeydf521292018-10-04 15:13:33 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090092 protected LeadershipService leadershipService;
93
Ray Milkeydf521292018-10-04 15:13:33 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090095 protected WorkPartitionService partitionService;
96
Ray Milkeydf521292018-10-04 15:13:33 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090098 protected WorkplaceStore workplaceStore;
99
Ray Milkeydf521292018-10-04 15:13:33 -0700100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900101 protected WorkflowStore workflowStore;
102
Ray Milkeydf521292018-10-04 15:13:33 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900104 protected ContextEventMapStore eventMapStore;
105
106 private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
107
108 private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
109 private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
110
111 private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
112 private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
113
114 private ExecutorService workflowBatchExecutor;
115 private ExecutorService workflowExecutor;
116
117 private ExecutorService handlerTaskBatchExecutor;
118 private ExecutorService handlerTaskExecutor;
119
120 private static final int DEFAULT_WORKFLOW_THREADS = 12;
121 private static final int DEFAULT_EVENTTASK_THREADS = 12;
122 private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
123
124 private ScheduledExecutorService eventMapTriggerExecutor;
125
126 private TimerChain timerChain = new TimerChain();
127
jaegonkime0f45b52018-10-09 20:23:26 +0900128 private JsonDataModelInjector dataModelInjector = new JsonDataModelInjector();
129
jaegonkim6a7b5242018-09-12 23:09:42 +0900130 public static final String APPID = "org.onosproject.workflow";
131 private ApplicationId appId;
132 private NodeId localNodeId;
133
134 @Activate
135 public void activate() {
136 appId = coreService.registerApplication(APPID);
137 workplaceStore.setDelegate(workplaceStoreDelegate);
138 localNodeId = clusterService.getLocalNode().id();
139 leadershipService.runForLeadership(appId.name());
140
141 workflowBatchExecutor = newSingleThreadExecutor(
142 groupedThreads("onos/workflow", "workflow-batch", log));
143 workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
144 groupedThreads("onos/workflow-exec", "worker-%d", log));
145 handlerTaskBatchExecutor = newSingleThreadExecutor(
146 groupedThreads("onos/workflow", "handlertask-batch", log));
147 handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
148 groupedThreads("onos/handlertask-exec", "worker-%d", log));
149 eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
150 groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
151
152 (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
153 JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
154 workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
155 new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
156
157 log.info("Started");
158 }
159
160 @Deactivate
161 public void deactivate() {
162 leadershipService.withdraw(appId.name());
163 workplaceStore.unsetDelegate(workplaceStoreDelegate);
164 workflowBatchExecutor.shutdown();
165 workflowExecutor.shutdown();
166 handlerTaskBatchExecutor.shutdown();
167 handlerTaskExecutor.shutdown();
168 eventMapTriggerExecutor.shutdown();
169 log.info("Stopped");
170 }
171
172 @Override
173 public void execInitWorklet(WorkflowContext context) {
174
175 Workflow workflow = workflowStore.get(context.workflowId());
176 if (workflow == null) {
177 log.error("Invalid workflow {}", context.workflowId());
178 return;
179 }
180
181 initWorkletExecution(context);
182 try {
183 Worklet initWorklet = workflow.init(context);
184 if (initWorklet != null) {
jaegonkime0f45b52018-10-09 20:23:26 +0900185
186 log.info("{} worklet.process:{}", context.name(), initWorklet.tag());
187 log.trace("{} context: {}", context.name(), context);
188
189 dataModelInjector.inject(initWorklet, context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900190 initWorklet.process(context);
jaegonkime0f45b52018-10-09 20:23:26 +0900191 dataModelInjector.inhale(initWorklet, context);
192
193 log.info("{} worklet.process(done): {}", context.name(), initWorklet.tag());
194 log.trace("{} context: {}", context.name(), context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900195 }
196
197 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900198 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900199 context.setCause(e.getMessage());
200 context.setState(WorkflowState.EXCEPTION);
201 workplaceStore.commitContext(context.name(), context, false);
202 return;
203 }
204 // trigger the execution of next worklet.
205 workplaceStore.registerContext(context.name(), context);
206 }
207
208 @Override
jaegonkime0f45b52018-10-09 20:23:26 +0900209 public void eval(String contextName) {
210
211 final WorkflowContext latestContext = workplaceStore.getContext(contextName);
212 if (latestContext == null) {
213 log.error("Invalid workflow context {}", contextName);
214 return;
215 }
216
217 initWorkletExecution(latestContext);
218
219 workplaceStore.commitContext(latestContext.name(), latestContext, true);
220 }
221
222 @Override
jaegonkim6a7b5242018-09-12 23:09:42 +0900223 public void eventMapTrigger(Event event, EventHintSupplier supplier) {
224
225 if (event.subject() instanceof SystemWorkflowContext) {
226 return;
227 }
228
229 Map<String, String> eventMap;
230
231 String eventHint;
232 try {
233 eventHint = supplier.apply(event);
234 } catch (Throwable e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900235 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900236 return;
237 }
238 if (eventHint == null) {
239 // do nothing
240 log.error("Invalid eventHint, event: {}", event);
241 return;
242 }
243
244 try {
245 eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
246 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900247 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900248 return;
249 }
250
251 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
252 // do nothing;
253 log.debug("Invalid eventMap, event: {}", event);
254 return;
255 }
256
257 for (Map.Entry<String, String> entry : eventMap.entrySet()) {
258 String contextName = entry.getKey();
jaegonkime0f45b52018-10-09 20:23:26 +0900259 String strProgramCounter = entry.getValue();
260 ProgramCounter pc;
261 try {
262 pc = ProgramCounter.valueOf(strProgramCounter);
263 } catch (IllegalArgumentException e) {
264 log.error("Exception: ", e);
265 return;
266 }
267
jaegonkim6a7b5242018-09-12 23:09:42 +0900268 WorkflowContext context = workplaceStore.getContext(contextName);
269 if (Objects.isNull(context)) {
270 log.info("Invalid context: {}, event: {}", contextName, event);
271 continue;
272 }
jaegonkime0f45b52018-10-09 20:23:26 +0900273 EventTask eventtask = null;
274 try {
275 eventtask = EventTask.builder()
jaegonkim6a7b5242018-09-12 23:09:42 +0900276 .event(event)
277 .eventHint(eventHint)
278 .context(context)
jaegonkime0f45b52018-10-09 20:23:26 +0900279 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900280 .build();
jaegonkime0f45b52018-10-09 20:23:26 +0900281 } catch (WorkflowException e) {
282 log.error("Exception: ", e);
283 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900284
jaegonkime0f45b52018-10-09 20:23:26 +0900285 log.debug("eventtaskAccumulator.add: task: {}", eventtask);
286 if (!Objects.isNull(eventtask)) {
287 eventtaskAccumulator.add(eventtask);
288 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900289 }
290 }
291
292 @Override
293 public void registerEventMap(Class<? extends Event> eventType, String eventHint,
jaegonkime0f45b52018-10-09 20:23:26 +0900294 String contextName, String programCounterString) throws WorkflowException {
295 eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, programCounterString);
jaegonkim6a7b5242018-09-12 23:09:42 +0900296 for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
297 Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
298 if (eventMap != null && eventMap.containsKey(contextName)) {
299 return;
300 }
301 try {
302 log.info("sleep {}", i);
303 Thread.sleep(10L * (i + 1));
304 } catch (InterruptedException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900305 log.error("Exception: ", e);
Ray Milkeyfe6afd82018-11-26 14:03:20 -0800306 Thread.currentThread().interrupt();
jaegonkim6a7b5242018-09-12 23:09:42 +0900307 }
308 }
309 }
310
311 @Override
312 protected void post(WorkflowDataEvent event) {
313
314 if (event.subject() == null || !isRelevant(event.subject())) {
315 log.debug("ignore event {}", event);
316 return;
317 }
318
319 // trigger next worklet selection
320 WorkflowData dataModelContainer = event.subject();
321 switch (event.type()) {
322 case INSERT:
323 case UPDATE:
324 if (dataModelContainer.triggerNext()) {
jaegonkime0f45b52018-10-09 20:23:26 +0900325 log.debug("workflowAccumulator.add: {}", dataModelContainer);
jaegonkim6a7b5242018-09-12 23:09:42 +0900326 workflowAccumulator.add(dataModelContainer);
327 } else {
328 log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
329 }
330 break;
331 case REMOVE:
332 break;
333 default:
334 }
335
336 // trigger EventTask for WorkflowDataEvent
337 eventMapTriggerExecutor.submit(
338 () -> eventMapTrigger(
339 event,
340 // event hint supplier
341 (ev) -> {
342 if (ev == null || ev.subject() == null) {
343 return null;
344 }
345
346 if (ev.subject() instanceof WorkflowData) {
347 return ((WorkflowData) ev.subject()).name();
348 } else {
349 return null;
350 }
351 }
352 )
353 );
354 }
355
356 /**
357 * Checks whether this workflow data job is relevant to this ONOS node.
358 * @param job workflow data
359 * @return checking result
360 */
361 private boolean isRelevant(WorkflowData job) {
362 // distributes event processing with work-partition
363 return partitionService.isMine(job.distributor(), this::stringHash);
364 }
365
366 /**
367 * Gets hash of the string.
368 * @param str string to get a hash
369 * @return hash value
370 */
371 public Long stringHash(String str) {
372 return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
373 }
374
375 /**
376 * Class for handler task batch delegation.
377 */
378 private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
379 @Override
380 public void execute(Collection<Collection<HandlerTask>> operations) {
381 log.debug("Execute {} operation(s).", operations.size());
382
383 CompletableFuture.runAsync(() -> {
384 List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
385 .map(
386 x -> CompletableFuture.completedFuture(x)
387 .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
388 .exceptionally(e -> null)
389 )
390 .collect(Collectors.toList());
391
392 // waiting the completion of futures
393 futures.parallelStream().forEach(x -> x.join());
394
395 }, handlerTaskBatchExecutor).exceptionally(e -> {
396 log.error("Error submitting batches:", e);
397 return null;
398 }).thenRun(eventtaskAccumulator::ready);
399 }
400 }
401
402 /**
403 * Initializes worklet execution.
404 * @param context workflow context
405 */
406 private void initWorkletExecution(WorkflowContext context) {
407 context.setState(WorkflowState.RUNNING);
408 context.setCause("");
409 context.setWorkflowExecutionService(this);
410 context.setWorkflowStore(workflowStore);
411 context.setWorkplaceStore(workplaceStore);
412 context.waitCompletion(null, null, null, 0L);
413 context.setTriggerNext(false);
414 }
415
416 /**
417 * Processes handler tasks.
418 * @param tasks handler tasks
419 * @return handler tasks processed
420 */
421 private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
422
423 for (HandlerTask task : tasks) {
424 if (task instanceof EventTimeoutTask) {
425 execEventTimeoutTask((EventTimeoutTask) task);
426 } else if (task instanceof TimeoutTask) {
427 execTimeoutTask((TimeoutTask) task);
428 } else if (task instanceof EventTask) {
429 execEventTask((EventTask) task);
430 } else {
431 log.error("Unsupported handler task {}", task);
432 }
433 }
434
435 return null;
436 }
437
438 /**
439 * Executes event task.
440 * @param task event task
441 * @return event task
442 */
443 private EventTask execEventTask(EventTask task) {
444
445 Map<String, String> eventMap = null;
446 try {
447 eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
448 } catch (WorkflowException e) {
449 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
450 return task;
451 }
452
453 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
454 return task;
455 }
456
457 if (Objects.isNull(eventMap.get(task.context().name()))) {
458 return task;
459 }
460
461 log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
462
463 WorkflowContext context = (WorkflowContext) (task.context());
464 Workflow workflow = workflowStore.get(context.workflowId());
465 if (workflow == null) {
466 log.error("Invalid workflow {}", context.workflowId());
467 return task;
468 }
469
470 WorkflowContext latestContext = workplaceStore.getContext(context.name());
471 if (latestContext == null) {
472 log.error("Invalid workflow context {}", context.name());
473 return task;
474 }
475
476 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900477 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900478 log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900479 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900480 return task;
481 }
482
jaegonkime0f45b52018-10-09 20:23:26 +0900483 Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
484 if (Worklet.Common.COMPLETED.equals(worklet) || Worklet.Common.INIT.equals(worklet)) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900485 log.error("Current worklet is {}, Ignored", worklet);
486 return task;
487 }
488
489 initWorkletExecution(latestContext);
490
jaegonkime0f45b52018-10-09 20:23:26 +0900491 log.info("{} worklet.isCompleted:{}", latestContext.name(), worklet.tag());
492 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
493
494 dataModelInjector.inject(worklet, latestContext);
495 boolean completed = worklet.isCompleted(latestContext, task.event());
496 dataModelInjector.inhale(worklet, latestContext);
497
498 if (completed) {
499
500 log.info("{} worklet.isCompleted(true):{}", latestContext.name(), worklet.tag());
501 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
502
jaegonkim6a7b5242018-09-12 23:09:42 +0900503 eventMapStore.unregisterEventMap(
504 task.eventType(), task.eventHint(), latestContext.name());
505
jaegonkime0f45b52018-10-09 20:23:26 +0900506 //completed case
507 // increase program counter
508 ProgramCounter pc = latestContext.current();
509 latestContext.setCurrent(workflow.increased(pc));
510
jaegonkim6a7b5242018-09-12 23:09:42 +0900511 workplaceStore.commitContext(latestContext.name(), latestContext, true);
512 return null;
513 } else {
jaegonkime0f45b52018-10-09 20:23:26 +0900514
515 log.info("{} worklet.isCompleted(false):{}", latestContext.name(), worklet.tag());
516 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
517
jaegonkim6a7b5242018-09-12 23:09:42 +0900518 workplaceStore.commitContext(latestContext.name(), latestContext, false);
519 }
520
521 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900522 log.error("Exception: ", e);
523 latestContext.setCause(e.getMessage());
524 latestContext.setState(WorkflowState.EXCEPTION);
525 workplaceStore.commitContext(latestContext.name(), latestContext, false);
526 } catch (StorageException e) {
527 log.error("Exception: ", e);
528 // StorageException does not commit context.
529 } catch (Exception e) {
530 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900531 latestContext.setCause(e.getMessage());
532 latestContext.setState(WorkflowState.EXCEPTION);
533 workplaceStore.commitContext(latestContext.name(), latestContext, false);
534 }
535
536 return task;
537 }
538
539 /**
540 * Executes event timeout task.
541 * @param task event timeout task
542 * @return handler task
543 */
544 private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
545
546 Map<String, String> eventMap = null;
547 try {
548 eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
549 } catch (WorkflowException e) {
550 log.error("execEventTimeoutTask: Exception: {}, trace: {}",
551 e, Lists.newArrayList(e.getStackTrace()));
552 return task;
553 }
554
555 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
556 return task;
557 }
558
559 if (Objects.isNull(eventMap.get(task.context().name()))) {
560 return task;
561 }
562
563 log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
564
jaegonkime0f45b52018-10-09 20:23:26 +0900565 WorkflowContext context = (WorkflowContext) (task.context());
jaegonkim6a7b5242018-09-12 23:09:42 +0900566 Workflow workflow = workflowStore.get(context.workflowId());
567 if (workflow == null) {
568 log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
569 return task;
570 }
571
572 WorkflowContext latestContext = workplaceStore.getContext(context.name());
573 if (latestContext == null) {
574 log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
575 return task;
576 }
577
578 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900579 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900580 log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900581 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900582 return task;
583 }
584
jaegonkime0f45b52018-10-09 20:23:26 +0900585 Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
jaegonkim6a7b5242018-09-12 23:09:42 +0900586 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
587 log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
588 return task;
589 }
590
591 initWorkletExecution(latestContext);
592
jaegonkim6a7b5242018-09-12 23:09:42 +0900593 eventMapStore.unregisterEventMap(
594 task.eventType(), task.eventHint(), latestContext.name());
595
jaegonkime0f45b52018-10-09 20:23:26 +0900596 log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
597 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
598
599 dataModelInjector.inject(worklet, latestContext);
jaegonkim6a7b5242018-09-12 23:09:42 +0900600 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900601 dataModelInjector.inhale(worklet, latestContext);
602
603 log.info("{} worklet.timeout(for event)(done):{}", latestContext.name(), worklet.tag());
604 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
605
606
jaegonkim6a7b5242018-09-12 23:09:42 +0900607 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
608
609 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900610 log.error("Exception: ", e);
611 latestContext.setCause(e.getMessage());
612 latestContext.setState(WorkflowState.EXCEPTION);
613 workplaceStore.commitContext(latestContext.name(), latestContext, false);
614 } catch (StorageException e) {
615 log.error("Exception: ", e);
616 // StorageException does not commit context.
617 } catch (Exception e) {
618 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900619 latestContext.setCause(e.getMessage());
620 latestContext.setState(WorkflowState.EXCEPTION);
621 workplaceStore.commitContext(latestContext.name(), latestContext, false);
622 }
623
624 return task;
625 }
626
627 /**
628 * Executes timeout task.
629 * @param task time out task
630 * @return handler task
631 */
632 private HandlerTask execTimeoutTask(TimeoutTask task) {
633
634 log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
635
636 WorkflowContext context = (WorkflowContext) (task.context());
637 Workflow workflow = workflowStore.get(context.workflowId());
638 if (workflow == null) {
639 log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
640 return task;
641 }
642
643 WorkflowContext latestContext = workplaceStore.getContext(context.name());
644 if (latestContext == null) {
645 log.error("execTimeoutTask: Invalid workflow context {}", context.name());
646 return task;
647 }
648
649 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900650 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900651 log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900652 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900653 return task;
654 }
655
jaegonkime0f45b52018-10-09 20:23:26 +0900656 Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
jaegonkim6a7b5242018-09-12 23:09:42 +0900657 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
658 log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
659 return task;
660 }
661
662 initWorkletExecution(latestContext);
663
jaegonkime0f45b52018-10-09 20:23:26 +0900664 log.info("{} worklet.timeout:{}", latestContext.name(), worklet.tag());
665 log.trace("{} context: {}", latestContext.name(), latestContext);
666
667 dataModelInjector.inject(worklet, latestContext);
jaegonkim6a7b5242018-09-12 23:09:42 +0900668 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900669 dataModelInjector.inhale(worklet, latestContext);
670
671 log.info("{} worklet.timeout(done):{}", latestContext.name(), worklet.tag());
672 log.trace("{} context: {}", latestContext.name(), latestContext);
673
jaegonkim6a7b5242018-09-12 23:09:42 +0900674 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
675
676 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900677 log.error("Exception: ", e);
678 latestContext.setCause(e.getMessage());
679 latestContext.setState(WorkflowState.EXCEPTION);
680 workplaceStore.commitContext(latestContext.name(), latestContext, false);
681 } catch (StorageException e) {
682 log.error("Exception: ", e);
683 // StorageException does not commit context.
684 } catch (Exception e) {
685 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900686 latestContext.setCause(e.getMessage());
687 latestContext.setState(WorkflowState.EXCEPTION);
688 workplaceStore.commitContext(latestContext.name(), latestContext, false);
689 }
690
691 return task;
692 }
693
694 /**
695 * Class for delegation of workflow batch execution.
696 */
697 private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
698 @Override
699 public void execute(Collection<WorkflowData> operations) {
700 log.debug("Execute {} operation(s).", operations.size());
701
702 CompletableFuture.runAsync(() -> {
703 List<CompletableFuture<WorkflowData>> futures = operations.stream()
704 .map(
705 x -> CompletableFuture.completedFuture(x)
706 .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
707 .exceptionally(e -> null)
708 )
709 .collect(Collectors.toList());
710
711 // waiting the completion of futures
712 futures.parallelStream().forEach(x -> x.join());
713
714 }, workflowBatchExecutor).exceptionally(e -> {
715 log.error("Error submitting batches:", e);
716 return null;
717 }).thenRun(workflowAccumulator::ready);
718 }
719 }
720
721 /**
722 * Executes workflow.
723 * @param dataModelContainer workflow data model container(workflow or workplace)
724 * @return
725 */
726 private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
727 if (dataModelContainer instanceof WorkflowContext) {
728 return execWorkflowContext((WorkflowContext) dataModelContainer);
729 } else if (dataModelContainer instanceof Workplace) {
730 return execWorkplace((Workplace) dataModelContainer);
731 } else {
732 log.error("Invalid context {}", dataModelContainer);
733 return null;
734 }
735 }
736
737 /**
738 * Executes workflow context.
739 * @param context workflow context
740 * @return workflow context
741 */
742 private WorkflowContext execWorkflowContext(WorkflowContext context) {
743
744 Workflow workflow = workflowStore.get(context.workflowId());
745 if (workflow == null) {
746 log.error("Invalid workflow {}", context.workflowId());
747 return null;
748 }
749
750 final WorkflowContext latestContext = workplaceStore.getContext(context.name());
751 if (latestContext == null) {
752 log.error("Invalid workflow context {}", context.name());
753 return null;
754 }
755
756 initWorkletExecution(latestContext);
757
758 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900759 final ProgramCounter pc = workflow.next(latestContext);
760 final Worklet worklet = workflow.getWorkletInstance(pc.workletType());
jaegonkim6a7b5242018-09-12 23:09:42 +0900761
762 if (worklet == Worklet.Common.INIT) {
763 log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
764 return latestContext;
765 }
766
jaegonkime0f45b52018-10-09 20:23:26 +0900767 latestContext.setCurrent(pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900768 if (worklet == Worklet.Common.COMPLETED) {
769
770 if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
771 workplaceStore.removeContext(latestContext.name());
772 return null;
773 } else {
774 latestContext.setState(WorkflowState.IDLE);
775 workplaceStore.commitContext(latestContext.name(), latestContext, false);
776 return latestContext;
777 }
778 }
779
jaegonkime0f45b52018-10-09 20:23:26 +0900780 log.info("{} worklet.process:{}", latestContext.name(), worklet.tag());
781 log.trace("{} context: {}", latestContext.name(), latestContext);
782
783 dataModelInjector.inject(worklet, latestContext);
jaegonkim6a7b5242018-09-12 23:09:42 +0900784 worklet.process(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900785 dataModelInjector.inhale(worklet, latestContext);
786
787 log.info("{} worklet.process(done): {}", latestContext.name(), worklet.tag());
788 log.trace("{} context: {}", latestContext.name(), latestContext);
789
jaegonkim6a7b5242018-09-12 23:09:42 +0900790
791 if (latestContext.completionEventType() != null) {
792 if (latestContext.completionEventGenerator() == null) {
793 String msg = String.format("Invalid exepecting event(%s), generator(%s)",
794 latestContext.completionEventType(),
795 latestContext.completionEventGenerator());
796 throw new WorkflowException(msg);
797 }
798
799 registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
jaegonkime0f45b52018-10-09 20:23:26 +0900800 latestContext.name(), pc.toString());
jaegonkim6a7b5242018-09-12 23:09:42 +0900801
802 latestContext.completionEventGenerator().apply();
803
804 if (latestContext.completionEventTimeout() != 0L) {
805 final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
806 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900807 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900808 .eventType(latestContext.completionEventType().getName())
809 .eventHint(latestContext.completionEventHint())
810 .build();
811 timerChain.schedule(latestContext.completionEventTimeout(),
812 () -> {
813 eventtaskAccumulator.add(eventTimeoutTask);
814 });
815 }
816 } else {
817 if (latestContext.completionEventTimeout() != 0L) {
818 final TimeoutTask timeoutTask = TimeoutTask.builder()
819 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900820 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900821 .build();
822
823 timerChain.schedule(latestContext.completionEventTimeout(),
824 () -> {
825 eventtaskAccumulator.add(timeoutTask);
826 });
jaegonkime0f45b52018-10-09 20:23:26 +0900827 } else {
828 //completed case
829 // increase program counter
830 latestContext.setCurrent(workflow.increased(pc));
jaegonkim6a7b5242018-09-12 23:09:42 +0900831 }
832 }
833
834 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
835
836 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900837 log.error("Exception: ", e);
838 latestContext.setCause(e.getMessage());
839 latestContext.setState(WorkflowState.EXCEPTION);
840 workplaceStore.commitContext(latestContext.name(), latestContext, false);
841 } catch (StorageException e) {
842 log.error("Exception: ", e);
843 // StorageException does not commit context.
844 } catch (Exception e) {
845 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900846 latestContext.setCause(e.getMessage());
847 latestContext.setState(WorkflowState.EXCEPTION);
848 workplaceStore.commitContext(latestContext.name(), latestContext, false);
849 }
850
851 return latestContext;
852 }
853
854 /**
855 * Execute workplace.
856 * @param workplace workplace
857 * @return workplace
858 */
859 private Workplace execWorkplace(Workplace workplace) {
860
861 return null;
862 }
863
jaegonkime0f45b52018-10-09 20:23:26 +0900864}