blob: 26e0bc928917f154e3e759bc70c9560e1305837f [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
19import com.google.common.collect.Lists;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.LeadershipService;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
31import org.onosproject.workflow.api.DefaultWorkplace;
32import org.onosproject.workflow.api.EventHintSupplier;
33import org.onosproject.workflow.api.EventTask;
34import org.onosproject.workflow.api.JsonDataModelTree;
35import org.onosproject.workflow.api.SystemWorkflowContext;
36import org.onosproject.workflow.api.EventTimeoutTask;
37import org.onosproject.workflow.api.TimeoutTask;
38import org.onosproject.workflow.api.TimerChain;
39import org.onosproject.workflow.api.Worklet;
40import org.onosproject.workflow.api.Workflow;
41import org.onosproject.workflow.api.WorkflowContext;
42import org.onosproject.workflow.api.WorkflowData;
43import org.onosproject.workflow.api.ContextEventMapStore;
44import org.onosproject.workflow.api.WorkflowState;
45import org.onosproject.workflow.api.WorkflowStore;
46import org.onosproject.workflow.api.WorkflowBatchDelegate;
47import org.onosproject.workflow.api.WorkflowDataEvent;
48import org.onosproject.workflow.api.WorkflowDataListener;
49import org.onosproject.workflow.api.WorkflowException;
50import org.onosproject.workflow.api.HandlerTask;
51import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
52import org.onosproject.workflow.api.Workplace;
53import org.onosproject.workflow.api.WorkplaceStore;
54import org.onosproject.workflow.api.WorkplaceStoreDelegate;
55import org.onosproject.workflow.api.WorkflowExecutionService;
56import org.onosproject.event.AbstractListenerManager;
57import org.onosproject.event.Event;
58import org.onosproject.net.intent.WorkPartitionService;
59import org.slf4j.Logger;
60
61import java.util.Collection;
62import java.util.List;
63import java.util.Map;
64import java.util.Objects;
65import java.util.UUID;
66import java.util.concurrent.CompletableFuture;
67import java.util.concurrent.ExecutorService;
68import java.util.concurrent.ScheduledExecutorService;
69import java.util.stream.Collectors;
70
71import static java.util.concurrent.Executors.newFixedThreadPool;
72import static java.util.concurrent.Executors.newSingleThreadExecutor;
73import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
74import static org.onlab.util.Tools.groupedThreads;
75import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
76import static org.slf4j.LoggerFactory.getLogger;
77
78@Component(immediate = true)
79@Service
80public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
81 implements WorkflowExecutionService {
82
83 protected static final Logger log = getLogger(WorkFlowEngine.class);
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected CoreService coreService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterService clusterService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected LeadershipService leadershipService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected WorkPartitionService partitionService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected WorkplaceStore workplaceStore;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected WorkflowStore workflowStore;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ContextEventMapStore eventMapStore;
104
105 private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
106
107 private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
108 private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
109
110 private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
111 private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
112
113 private ExecutorService workflowBatchExecutor;
114 private ExecutorService workflowExecutor;
115
116 private ExecutorService handlerTaskBatchExecutor;
117 private ExecutorService handlerTaskExecutor;
118
119 private static final int DEFAULT_WORKFLOW_THREADS = 12;
120 private static final int DEFAULT_EVENTTASK_THREADS = 12;
121 private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
122
123 private ScheduledExecutorService eventMapTriggerExecutor;
124
125 private TimerChain timerChain = new TimerChain();
126
127 public static final String APPID = "org.onosproject.workflow";
128 private ApplicationId appId;
129 private NodeId localNodeId;
130
131 @Activate
132 public void activate() {
133 appId = coreService.registerApplication(APPID);
134 workplaceStore.setDelegate(workplaceStoreDelegate);
135 localNodeId = clusterService.getLocalNode().id();
136 leadershipService.runForLeadership(appId.name());
137
138 workflowBatchExecutor = newSingleThreadExecutor(
139 groupedThreads("onos/workflow", "workflow-batch", log));
140 workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
141 groupedThreads("onos/workflow-exec", "worker-%d", log));
142 handlerTaskBatchExecutor = newSingleThreadExecutor(
143 groupedThreads("onos/workflow", "handlertask-batch", log));
144 handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
145 groupedThreads("onos/handlertask-exec", "worker-%d", log));
146 eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
147 groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
148
149 (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
150 JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
151 workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
152 new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
153
154 log.info("Started");
155 }
156
157 @Deactivate
158 public void deactivate() {
159 leadershipService.withdraw(appId.name());
160 workplaceStore.unsetDelegate(workplaceStoreDelegate);
161 workflowBatchExecutor.shutdown();
162 workflowExecutor.shutdown();
163 handlerTaskBatchExecutor.shutdown();
164 handlerTaskExecutor.shutdown();
165 eventMapTriggerExecutor.shutdown();
166 log.info("Stopped");
167 }
168
169 @Override
170 public void execInitWorklet(WorkflowContext context) {
171
172 Workflow workflow = workflowStore.get(context.workflowId());
173 if (workflow == null) {
174 log.error("Invalid workflow {}", context.workflowId());
175 return;
176 }
177
178 initWorkletExecution(context);
179 try {
180 Worklet initWorklet = workflow.init(context);
181 if (initWorklet != null) {
182 initWorklet.process(context);
183 }
184
185 } catch (WorkflowException e) {
186 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
187 context.setCause(e.getMessage());
188 context.setState(WorkflowState.EXCEPTION);
189 workplaceStore.commitContext(context.name(), context, false);
190 return;
191 }
192 // trigger the execution of next worklet.
193 workplaceStore.registerContext(context.name(), context);
194 }
195
196 @Override
197 public void eventMapTrigger(Event event, EventHintSupplier supplier) {
198
199 if (event.subject() instanceof SystemWorkflowContext) {
200 return;
201 }
202
203 Map<String, String> eventMap;
204
205 String eventHint;
206 try {
207 eventHint = supplier.apply(event);
208 } catch (Throwable e) {
209 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
210 return;
211 }
212 if (eventHint == null) {
213 // do nothing
214 log.error("Invalid eventHint, event: {}", event);
215 return;
216 }
217
218 try {
219 eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
220 } catch (WorkflowException e) {
221 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
222 return;
223 }
224
225 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
226 // do nothing;
227 log.debug("Invalid eventMap, event: {}", event);
228 return;
229 }
230
231 for (Map.Entry<String, String> entry : eventMap.entrySet()) {
232 String contextName = entry.getKey();
233 String workletType = entry.getValue();
234 WorkflowContext context = workplaceStore.getContext(contextName);
235 if (Objects.isNull(context)) {
236 log.info("Invalid context: {}, event: {}", contextName, event);
237 continue;
238 }
239 EventTask eventtask = EventTask.builder()
240 .event(event)
241 .eventHint(eventHint)
242 .context(context)
243 .workletType(workletType)
244 .build();
245
246 log.info("eventtaskAccumulator.add: task: {}", eventtask);
247 eventtaskAccumulator.add(eventtask);
248 }
249 }
250
251 @Override
252 public void registerEventMap(Class<? extends Event> eventType, String eventHint,
253 String contextName, String workletType) throws WorkflowException {
254 eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, workletType);
255 for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
256 Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
257 if (eventMap != null && eventMap.containsKey(contextName)) {
258 return;
259 }
260 try {
261 log.info("sleep {}", i);
262 Thread.sleep(10L * (i + 1));
263 } catch (InterruptedException e) {
264 e.printStackTrace();
265 }
266 }
267 }
268
269 @Override
270 protected void post(WorkflowDataEvent event) {
271
272 if (event.subject() == null || !isRelevant(event.subject())) {
273 log.debug("ignore event {}", event);
274 return;
275 }
276
277 // trigger next worklet selection
278 WorkflowData dataModelContainer = event.subject();
279 switch (event.type()) {
280 case INSERT:
281 case UPDATE:
282 if (dataModelContainer.triggerNext()) {
283 log.info("workflowAccumulator.add: {}", dataModelContainer);
284 workflowAccumulator.add(dataModelContainer);
285 } else {
286 log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
287 }
288 break;
289 case REMOVE:
290 break;
291 default:
292 }
293
294 // trigger EventTask for WorkflowDataEvent
295 eventMapTriggerExecutor.submit(
296 () -> eventMapTrigger(
297 event,
298 // event hint supplier
299 (ev) -> {
300 if (ev == null || ev.subject() == null) {
301 return null;
302 }
303
304 if (ev.subject() instanceof WorkflowData) {
305 return ((WorkflowData) ev.subject()).name();
306 } else {
307 return null;
308 }
309 }
310 )
311 );
312 }
313
314 /**
315 * Checks whether this workflow data job is relevant to this ONOS node.
316 * @param job workflow data
317 * @return checking result
318 */
319 private boolean isRelevant(WorkflowData job) {
320 // distributes event processing with work-partition
321 return partitionService.isMine(job.distributor(), this::stringHash);
322 }
323
324 /**
325 * Gets hash of the string.
326 * @param str string to get a hash
327 * @return hash value
328 */
329 public Long stringHash(String str) {
330 return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
331 }
332
333 /**
334 * Class for handler task batch delegation.
335 */
336 private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
337 @Override
338 public void execute(Collection<Collection<HandlerTask>> operations) {
339 log.debug("Execute {} operation(s).", operations.size());
340
341 CompletableFuture.runAsync(() -> {
342 List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
343 .map(
344 x -> CompletableFuture.completedFuture(x)
345 .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
346 .exceptionally(e -> null)
347 )
348 .collect(Collectors.toList());
349
350 // waiting the completion of futures
351 futures.parallelStream().forEach(x -> x.join());
352
353 }, handlerTaskBatchExecutor).exceptionally(e -> {
354 log.error("Error submitting batches:", e);
355 return null;
356 }).thenRun(eventtaskAccumulator::ready);
357 }
358 }
359
360 /**
361 * Initializes worklet execution.
362 * @param context workflow context
363 */
364 private void initWorkletExecution(WorkflowContext context) {
365 context.setState(WorkflowState.RUNNING);
366 context.setCause("");
367 context.setWorkflowExecutionService(this);
368 context.setWorkflowStore(workflowStore);
369 context.setWorkplaceStore(workplaceStore);
370 context.waitCompletion(null, null, null, 0L);
371 context.setTriggerNext(false);
372 }
373
374 /**
375 * Processes handler tasks.
376 * @param tasks handler tasks
377 * @return handler tasks processed
378 */
379 private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
380
381 for (HandlerTask task : tasks) {
382 if (task instanceof EventTimeoutTask) {
383 execEventTimeoutTask((EventTimeoutTask) task);
384 } else if (task instanceof TimeoutTask) {
385 execTimeoutTask((TimeoutTask) task);
386 } else if (task instanceof EventTask) {
387 execEventTask((EventTask) task);
388 } else {
389 log.error("Unsupported handler task {}", task);
390 }
391 }
392
393 return null;
394 }
395
396 /**
397 * Executes event task.
398 * @param task event task
399 * @return event task
400 */
401 private EventTask execEventTask(EventTask task) {
402
403 Map<String, String> eventMap = null;
404 try {
405 eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
406 } catch (WorkflowException e) {
407 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
408 return task;
409 }
410
411 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
412 return task;
413 }
414
415 if (Objects.isNull(eventMap.get(task.context().name()))) {
416 return task;
417 }
418
419 log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
420
421 WorkflowContext context = (WorkflowContext) (task.context());
422 Workflow workflow = workflowStore.get(context.workflowId());
423 if (workflow == null) {
424 log.error("Invalid workflow {}", context.workflowId());
425 return task;
426 }
427
428 WorkflowContext latestContext = workplaceStore.getContext(context.name());
429 if (latestContext == null) {
430 log.error("Invalid workflow context {}", context.name());
431 return task;
432 }
433
434 try {
435 Worklet worklet = workflow.getWorkletInstance(task.workletType());
436 if (!Objects.equals(latestContext.current(), worklet.tag())) {
437 log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
438 latestContext.current(), worklet.tag());
439 return task;
440 }
441
442 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
443 log.error("Current worklet is {}, Ignored", worklet);
444 return task;
445 }
446
447 initWorkletExecution(latestContext);
448
449 log.info("processHandlerTask.isCompleted-task:{}, latest:{}", task, latestContext);
450 if (worklet.isCompleted(latestContext, task.event())) {
451 eventMapStore.unregisterEventMap(
452 task.eventType(), task.eventHint(), latestContext.name());
453
454 workplaceStore.commitContext(latestContext.name(), latestContext, true);
455 return null;
456 } else {
457 workplaceStore.commitContext(latestContext.name(), latestContext, false);
458 }
459
460 } catch (WorkflowException e) {
461 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
462 latestContext.setCause(e.getMessage());
463 latestContext.setState(WorkflowState.EXCEPTION);
464 workplaceStore.commitContext(latestContext.name(), latestContext, false);
465 }
466
467 return task;
468 }
469
470 /**
471 * Executes event timeout task.
472 * @param task event timeout task
473 * @return handler task
474 */
475 private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
476
477 Map<String, String> eventMap = null;
478 try {
479 eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
480 } catch (WorkflowException e) {
481 log.error("execEventTimeoutTask: Exception: {}, trace: {}",
482 e, Lists.newArrayList(e.getStackTrace()));
483 return task;
484 }
485
486 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
487 return task;
488 }
489
490 if (Objects.isNull(eventMap.get(task.context().name()))) {
491 return task;
492 }
493
494 log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
495
496 WorkflowContext context = task.context();
497 Workflow workflow = workflowStore.get(context.workflowId());
498 if (workflow == null) {
499 log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
500 return task;
501 }
502
503 WorkflowContext latestContext = workplaceStore.getContext(context.name());
504 if (latestContext == null) {
505 log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
506 return task;
507 }
508
509 try {
510 Worklet worklet = workflow.getWorkletInstance(task.workletType());
511 if (!Objects.equals(latestContext.current(), worklet.tag())) {
512 log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
513 latestContext.current(), worklet.tag());
514 return task;
515 }
516
517 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
518 log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
519 return task;
520 }
521
522 initWorkletExecution(latestContext);
523
524 log.info("execEventTimeoutTask.timeout-task:{}, latest:{}", task, latestContext);
525 eventMapStore.unregisterEventMap(
526 task.eventType(), task.eventHint(), latestContext.name());
527
528 worklet.timeout(latestContext);
529 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
530
531 } catch (WorkflowException e) {
532 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
533 latestContext.setCause(e.getMessage());
534 latestContext.setState(WorkflowState.EXCEPTION);
535 workplaceStore.commitContext(latestContext.name(), latestContext, false);
536 }
537
538 return task;
539 }
540
541 /**
542 * Executes timeout task.
543 * @param task time out task
544 * @return handler task
545 */
546 private HandlerTask execTimeoutTask(TimeoutTask task) {
547
548 log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
549
550 WorkflowContext context = (WorkflowContext) (task.context());
551 Workflow workflow = workflowStore.get(context.workflowId());
552 if (workflow == null) {
553 log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
554 return task;
555 }
556
557 WorkflowContext latestContext = workplaceStore.getContext(context.name());
558 if (latestContext == null) {
559 log.error("execTimeoutTask: Invalid workflow context {}", context.name());
560 return task;
561 }
562
563 try {
564 Worklet worklet = workflow.getWorkletInstance(task.workletType());
565 if (!Objects.equals(latestContext.current(), worklet.tag())) {
566 log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
567 latestContext.current(), worklet.tag());
568 return task;
569 }
570
571 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
572 log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
573 return task;
574 }
575
576 initWorkletExecution(latestContext);
577
578 worklet.timeout(latestContext);
579 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
580
581 } catch (WorkflowException e) {
582 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
583 latestContext.setCause(e.getMessage());
584 latestContext.setState(WorkflowState.EXCEPTION);
585 workplaceStore.commitContext(latestContext.name(), latestContext, false);
586 }
587
588 return task;
589 }
590
591 /**
592 * Class for delegation of workflow batch execution.
593 */
594 private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
595 @Override
596 public void execute(Collection<WorkflowData> operations) {
597 log.debug("Execute {} operation(s).", operations.size());
598
599 CompletableFuture.runAsync(() -> {
600 List<CompletableFuture<WorkflowData>> futures = operations.stream()
601 .map(
602 x -> CompletableFuture.completedFuture(x)
603 .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
604 .exceptionally(e -> null)
605 )
606 .collect(Collectors.toList());
607
608 // waiting the completion of futures
609 futures.parallelStream().forEach(x -> x.join());
610
611 }, workflowBatchExecutor).exceptionally(e -> {
612 log.error("Error submitting batches:", e);
613 return null;
614 }).thenRun(workflowAccumulator::ready);
615 }
616 }
617
618 /**
619 * Executes workflow.
620 * @param dataModelContainer workflow data model container(workflow or workplace)
621 * @return
622 */
623 private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
624 if (dataModelContainer instanceof WorkflowContext) {
625 return execWorkflowContext((WorkflowContext) dataModelContainer);
626 } else if (dataModelContainer instanceof Workplace) {
627 return execWorkplace((Workplace) dataModelContainer);
628 } else {
629 log.error("Invalid context {}", dataModelContainer);
630 return null;
631 }
632 }
633
634 /**
635 * Executes workflow context.
636 * @param context workflow context
637 * @return workflow context
638 */
639 private WorkflowContext execWorkflowContext(WorkflowContext context) {
640
641 Workflow workflow = workflowStore.get(context.workflowId());
642 if (workflow == null) {
643 log.error("Invalid workflow {}", context.workflowId());
644 return null;
645 }
646
647 final WorkflowContext latestContext = workplaceStore.getContext(context.name());
648 if (latestContext == null) {
649 log.error("Invalid workflow context {}", context.name());
650 return null;
651 }
652
653 initWorkletExecution(latestContext);
654
655 try {
656 final Worklet worklet = workflow.next(latestContext);
657
658 if (worklet == Worklet.Common.INIT) {
659 log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
660 return latestContext;
661 }
662
663 latestContext.setCurrent(worklet);
664 if (worklet == Worklet.Common.COMPLETED) {
665
666 if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
667 workplaceStore.removeContext(latestContext.name());
668 return null;
669 } else {
670 latestContext.setState(WorkflowState.IDLE);
671 workplaceStore.commitContext(latestContext.name(), latestContext, false);
672 return latestContext;
673 }
674 }
675
676 log.info("execWorkflowContext.process:{}, {}", worklet.tag(), latestContext);
677 worklet.process(latestContext);
678
679 if (latestContext.completionEventType() != null) {
680 if (latestContext.completionEventGenerator() == null) {
681 String msg = String.format("Invalid exepecting event(%s), generator(%s)",
682 latestContext.completionEventType(),
683 latestContext.completionEventGenerator());
684 throw new WorkflowException(msg);
685 }
686
687 registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
688 latestContext.name(), worklet.tag());
689
690 latestContext.completionEventGenerator().apply();
691
692 if (latestContext.completionEventTimeout() != 0L) {
693 final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
694 .context(latestContext)
695 .workletType(worklet.tag())
696 .eventType(latestContext.completionEventType().getName())
697 .eventHint(latestContext.completionEventHint())
698 .build();
699 timerChain.schedule(latestContext.completionEventTimeout(),
700 () -> {
701 eventtaskAccumulator.add(eventTimeoutTask);
702 });
703 }
704 } else {
705 if (latestContext.completionEventTimeout() != 0L) {
706 final TimeoutTask timeoutTask = TimeoutTask.builder()
707 .context(latestContext)
708 .workletType(worklet.tag())
709 .build();
710
711 timerChain.schedule(latestContext.completionEventTimeout(),
712 () -> {
713 eventtaskAccumulator.add(timeoutTask);
714 });
715 }
716 }
717
718 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
719
720 } catch (WorkflowException e) {
721 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
722 latestContext.setCause(e.getMessage());
723 latestContext.setState(WorkflowState.EXCEPTION);
724 workplaceStore.commitContext(latestContext.name(), latestContext, false);
725 }
726
727 return latestContext;
728 }
729
730 /**
731 * Execute workplace.
732 * @param workplace workplace
733 * @return workplace
734 */
735 private Workplace execWorkplace(Workplace workplace) {
736
737 return null;
738 }
739
740}