blob: e4d545d181c0964f366cf5b9115071f2359842f9 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
19import com.google.common.collect.Lists;
jaegonkim6a7b5242018-09-12 23:09:42 +090020import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.workflow.api.DefaultWorkplace;
26import org.onosproject.workflow.api.EventHintSupplier;
27import org.onosproject.workflow.api.EventTask;
28import org.onosproject.workflow.api.JsonDataModelTree;
29import org.onosproject.workflow.api.SystemWorkflowContext;
30import org.onosproject.workflow.api.EventTimeoutTask;
31import org.onosproject.workflow.api.TimeoutTask;
32import org.onosproject.workflow.api.TimerChain;
33import org.onosproject.workflow.api.Worklet;
34import org.onosproject.workflow.api.Workflow;
35import org.onosproject.workflow.api.WorkflowContext;
36import org.onosproject.workflow.api.WorkflowData;
37import org.onosproject.workflow.api.ContextEventMapStore;
38import org.onosproject.workflow.api.WorkflowState;
39import org.onosproject.workflow.api.WorkflowStore;
40import org.onosproject.workflow.api.WorkflowBatchDelegate;
41import org.onosproject.workflow.api.WorkflowDataEvent;
42import org.onosproject.workflow.api.WorkflowDataListener;
43import org.onosproject.workflow.api.WorkflowException;
44import org.onosproject.workflow.api.HandlerTask;
45import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
46import org.onosproject.workflow.api.Workplace;
47import org.onosproject.workflow.api.WorkplaceStore;
48import org.onosproject.workflow.api.WorkplaceStoreDelegate;
49import org.onosproject.workflow.api.WorkflowExecutionService;
50import org.onosproject.event.AbstractListenerManager;
51import org.onosproject.event.Event;
52import org.onosproject.net.intent.WorkPartitionService;
Ray Milkeydf521292018-10-04 15:13:33 -070053import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Reference;
57import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090058import org.slf4j.Logger;
59
60import java.util.Collection;
61import java.util.List;
62import java.util.Map;
63import java.util.Objects;
64import java.util.UUID;
65import java.util.concurrent.CompletableFuture;
66import java.util.concurrent.ExecutorService;
67import java.util.concurrent.ScheduledExecutorService;
68import java.util.stream.Collectors;
69
70import static java.util.concurrent.Executors.newFixedThreadPool;
71import static java.util.concurrent.Executors.newSingleThreadExecutor;
72import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
73import static org.onlab.util.Tools.groupedThreads;
74import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
75import static org.slf4j.LoggerFactory.getLogger;
76
Ray Milkeydf521292018-10-04 15:13:33 -070077@Component(immediate = true, service = WorkflowExecutionService.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090078public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
79 implements WorkflowExecutionService {
80
81 protected static final Logger log = getLogger(WorkFlowEngine.class);
Ray Milkeydf521292018-10-04 15:13:33 -070082 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090083 protected CoreService coreService;
84
Ray Milkeydf521292018-10-04 15:13:33 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090086 protected ClusterService clusterService;
87
Ray Milkeydf521292018-10-04 15:13:33 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090089 protected LeadershipService leadershipService;
90
Ray Milkeydf521292018-10-04 15:13:33 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090092 protected WorkPartitionService partitionService;
93
Ray Milkeydf521292018-10-04 15:13:33 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090095 protected WorkplaceStore workplaceStore;
96
Ray Milkeydf521292018-10-04 15:13:33 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090098 protected WorkflowStore workflowStore;
99
Ray Milkeydf521292018-10-04 15:13:33 -0700100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900101 protected ContextEventMapStore eventMapStore;
102
103 private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
104
105 private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
106 private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
107
108 private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
109 private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
110
111 private ExecutorService workflowBatchExecutor;
112 private ExecutorService workflowExecutor;
113
114 private ExecutorService handlerTaskBatchExecutor;
115 private ExecutorService handlerTaskExecutor;
116
117 private static final int DEFAULT_WORKFLOW_THREADS = 12;
118 private static final int DEFAULT_EVENTTASK_THREADS = 12;
119 private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
120
121 private ScheduledExecutorService eventMapTriggerExecutor;
122
123 private TimerChain timerChain = new TimerChain();
124
125 public static final String APPID = "org.onosproject.workflow";
126 private ApplicationId appId;
127 private NodeId localNodeId;
128
129 @Activate
130 public void activate() {
131 appId = coreService.registerApplication(APPID);
132 workplaceStore.setDelegate(workplaceStoreDelegate);
133 localNodeId = clusterService.getLocalNode().id();
134 leadershipService.runForLeadership(appId.name());
135
136 workflowBatchExecutor = newSingleThreadExecutor(
137 groupedThreads("onos/workflow", "workflow-batch", log));
138 workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
139 groupedThreads("onos/workflow-exec", "worker-%d", log));
140 handlerTaskBatchExecutor = newSingleThreadExecutor(
141 groupedThreads("onos/workflow", "handlertask-batch", log));
142 handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
143 groupedThreads("onos/handlertask-exec", "worker-%d", log));
144 eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
145 groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
146
147 (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
148 JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
149 workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
150 new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
151
152 log.info("Started");
153 }
154
155 @Deactivate
156 public void deactivate() {
157 leadershipService.withdraw(appId.name());
158 workplaceStore.unsetDelegate(workplaceStoreDelegate);
159 workflowBatchExecutor.shutdown();
160 workflowExecutor.shutdown();
161 handlerTaskBatchExecutor.shutdown();
162 handlerTaskExecutor.shutdown();
163 eventMapTriggerExecutor.shutdown();
164 log.info("Stopped");
165 }
166
167 @Override
168 public void execInitWorklet(WorkflowContext context) {
169
170 Workflow workflow = workflowStore.get(context.workflowId());
171 if (workflow == null) {
172 log.error("Invalid workflow {}", context.workflowId());
173 return;
174 }
175
176 initWorkletExecution(context);
177 try {
178 Worklet initWorklet = workflow.init(context);
179 if (initWorklet != null) {
180 initWorklet.process(context);
181 }
182
183 } catch (WorkflowException e) {
184 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
185 context.setCause(e.getMessage());
186 context.setState(WorkflowState.EXCEPTION);
187 workplaceStore.commitContext(context.name(), context, false);
188 return;
189 }
190 // trigger the execution of next worklet.
191 workplaceStore.registerContext(context.name(), context);
192 }
193
194 @Override
195 public void eventMapTrigger(Event event, EventHintSupplier supplier) {
196
197 if (event.subject() instanceof SystemWorkflowContext) {
198 return;
199 }
200
201 Map<String, String> eventMap;
202
203 String eventHint;
204 try {
205 eventHint = supplier.apply(event);
206 } catch (Throwable e) {
207 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
208 return;
209 }
210 if (eventHint == null) {
211 // do nothing
212 log.error("Invalid eventHint, event: {}", event);
213 return;
214 }
215
216 try {
217 eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
218 } catch (WorkflowException e) {
219 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
220 return;
221 }
222
223 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
224 // do nothing;
225 log.debug("Invalid eventMap, event: {}", event);
226 return;
227 }
228
229 for (Map.Entry<String, String> entry : eventMap.entrySet()) {
230 String contextName = entry.getKey();
231 String workletType = entry.getValue();
232 WorkflowContext context = workplaceStore.getContext(contextName);
233 if (Objects.isNull(context)) {
234 log.info("Invalid context: {}, event: {}", contextName, event);
235 continue;
236 }
237 EventTask eventtask = EventTask.builder()
238 .event(event)
239 .eventHint(eventHint)
240 .context(context)
241 .workletType(workletType)
242 .build();
243
244 log.info("eventtaskAccumulator.add: task: {}", eventtask);
245 eventtaskAccumulator.add(eventtask);
246 }
247 }
248
249 @Override
250 public void registerEventMap(Class<? extends Event> eventType, String eventHint,
251 String contextName, String workletType) throws WorkflowException {
252 eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, workletType);
253 for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
254 Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
255 if (eventMap != null && eventMap.containsKey(contextName)) {
256 return;
257 }
258 try {
259 log.info("sleep {}", i);
260 Thread.sleep(10L * (i + 1));
261 } catch (InterruptedException e) {
Ray Milkeyfe6afd82018-11-26 14:03:20 -0800262 Thread.currentThread().interrupt();
jaegonkim6a7b5242018-09-12 23:09:42 +0900263 }
264 }
265 }
266
267 @Override
268 protected void post(WorkflowDataEvent event) {
269
270 if (event.subject() == null || !isRelevant(event.subject())) {
271 log.debug("ignore event {}", event);
272 return;
273 }
274
275 // trigger next worklet selection
276 WorkflowData dataModelContainer = event.subject();
277 switch (event.type()) {
278 case INSERT:
279 case UPDATE:
280 if (dataModelContainer.triggerNext()) {
281 log.info("workflowAccumulator.add: {}", dataModelContainer);
282 workflowAccumulator.add(dataModelContainer);
283 } else {
284 log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
285 }
286 break;
287 case REMOVE:
288 break;
289 default:
290 }
291
292 // trigger EventTask for WorkflowDataEvent
293 eventMapTriggerExecutor.submit(
294 () -> eventMapTrigger(
295 event,
296 // event hint supplier
297 (ev) -> {
298 if (ev == null || ev.subject() == null) {
299 return null;
300 }
301
302 if (ev.subject() instanceof WorkflowData) {
303 return ((WorkflowData) ev.subject()).name();
304 } else {
305 return null;
306 }
307 }
308 )
309 );
310 }
311
312 /**
313 * Checks whether this workflow data job is relevant to this ONOS node.
314 * @param job workflow data
315 * @return checking result
316 */
317 private boolean isRelevant(WorkflowData job) {
318 // distributes event processing with work-partition
319 return partitionService.isMine(job.distributor(), this::stringHash);
320 }
321
322 /**
323 * Gets hash of the string.
324 * @param str string to get a hash
325 * @return hash value
326 */
327 public Long stringHash(String str) {
328 return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
329 }
330
331 /**
332 * Class for handler task batch delegation.
333 */
334 private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
335 @Override
336 public void execute(Collection<Collection<HandlerTask>> operations) {
337 log.debug("Execute {} operation(s).", operations.size());
338
339 CompletableFuture.runAsync(() -> {
340 List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
341 .map(
342 x -> CompletableFuture.completedFuture(x)
343 .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
344 .exceptionally(e -> null)
345 )
346 .collect(Collectors.toList());
347
348 // waiting the completion of futures
349 futures.parallelStream().forEach(x -> x.join());
350
351 }, handlerTaskBatchExecutor).exceptionally(e -> {
352 log.error("Error submitting batches:", e);
353 return null;
354 }).thenRun(eventtaskAccumulator::ready);
355 }
356 }
357
358 /**
359 * Initializes worklet execution.
360 * @param context workflow context
361 */
362 private void initWorkletExecution(WorkflowContext context) {
363 context.setState(WorkflowState.RUNNING);
364 context.setCause("");
365 context.setWorkflowExecutionService(this);
366 context.setWorkflowStore(workflowStore);
367 context.setWorkplaceStore(workplaceStore);
368 context.waitCompletion(null, null, null, 0L);
369 context.setTriggerNext(false);
370 }
371
372 /**
373 * Processes handler tasks.
374 * @param tasks handler tasks
375 * @return handler tasks processed
376 */
377 private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
378
379 for (HandlerTask task : tasks) {
380 if (task instanceof EventTimeoutTask) {
381 execEventTimeoutTask((EventTimeoutTask) task);
382 } else if (task instanceof TimeoutTask) {
383 execTimeoutTask((TimeoutTask) task);
384 } else if (task instanceof EventTask) {
385 execEventTask((EventTask) task);
386 } else {
387 log.error("Unsupported handler task {}", task);
388 }
389 }
390
391 return null;
392 }
393
394 /**
395 * Executes event task.
396 * @param task event task
397 * @return event task
398 */
399 private EventTask execEventTask(EventTask task) {
400
401 Map<String, String> eventMap = null;
402 try {
403 eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
404 } catch (WorkflowException e) {
405 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
406 return task;
407 }
408
409 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
410 return task;
411 }
412
413 if (Objects.isNull(eventMap.get(task.context().name()))) {
414 return task;
415 }
416
417 log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
418
419 WorkflowContext context = (WorkflowContext) (task.context());
420 Workflow workflow = workflowStore.get(context.workflowId());
421 if (workflow == null) {
422 log.error("Invalid workflow {}", context.workflowId());
423 return task;
424 }
425
426 WorkflowContext latestContext = workplaceStore.getContext(context.name());
427 if (latestContext == null) {
428 log.error("Invalid workflow context {}", context.name());
429 return task;
430 }
431
432 try {
433 Worklet worklet = workflow.getWorkletInstance(task.workletType());
434 if (!Objects.equals(latestContext.current(), worklet.tag())) {
435 log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
436 latestContext.current(), worklet.tag());
437 return task;
438 }
439
440 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
441 log.error("Current worklet is {}, Ignored", worklet);
442 return task;
443 }
444
445 initWorkletExecution(latestContext);
446
447 log.info("processHandlerTask.isCompleted-task:{}, latest:{}", task, latestContext);
448 if (worklet.isCompleted(latestContext, task.event())) {
449 eventMapStore.unregisterEventMap(
450 task.eventType(), task.eventHint(), latestContext.name());
451
452 workplaceStore.commitContext(latestContext.name(), latestContext, true);
453 return null;
454 } else {
455 workplaceStore.commitContext(latestContext.name(), latestContext, false);
456 }
457
458 } catch (WorkflowException e) {
459 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
460 latestContext.setCause(e.getMessage());
461 latestContext.setState(WorkflowState.EXCEPTION);
462 workplaceStore.commitContext(latestContext.name(), latestContext, false);
463 }
464
465 return task;
466 }
467
468 /**
469 * Executes event timeout task.
470 * @param task event timeout task
471 * @return handler task
472 */
473 private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
474
475 Map<String, String> eventMap = null;
476 try {
477 eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
478 } catch (WorkflowException e) {
479 log.error("execEventTimeoutTask: Exception: {}, trace: {}",
480 e, Lists.newArrayList(e.getStackTrace()));
481 return task;
482 }
483
484 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
485 return task;
486 }
487
488 if (Objects.isNull(eventMap.get(task.context().name()))) {
489 return task;
490 }
491
492 log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
493
494 WorkflowContext context = task.context();
495 Workflow workflow = workflowStore.get(context.workflowId());
496 if (workflow == null) {
497 log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
498 return task;
499 }
500
501 WorkflowContext latestContext = workplaceStore.getContext(context.name());
502 if (latestContext == null) {
503 log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
504 return task;
505 }
506
507 try {
508 Worklet worklet = workflow.getWorkletInstance(task.workletType());
509 if (!Objects.equals(latestContext.current(), worklet.tag())) {
510 log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
511 latestContext.current(), worklet.tag());
512 return task;
513 }
514
515 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
516 log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
517 return task;
518 }
519
520 initWorkletExecution(latestContext);
521
522 log.info("execEventTimeoutTask.timeout-task:{}, latest:{}", task, latestContext);
523 eventMapStore.unregisterEventMap(
524 task.eventType(), task.eventHint(), latestContext.name());
525
526 worklet.timeout(latestContext);
527 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
528
529 } catch (WorkflowException e) {
530 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
531 latestContext.setCause(e.getMessage());
532 latestContext.setState(WorkflowState.EXCEPTION);
533 workplaceStore.commitContext(latestContext.name(), latestContext, false);
534 }
535
536 return task;
537 }
538
539 /**
540 * Executes timeout task.
541 * @param task time out task
542 * @return handler task
543 */
544 private HandlerTask execTimeoutTask(TimeoutTask task) {
545
546 log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
547
548 WorkflowContext context = (WorkflowContext) (task.context());
549 Workflow workflow = workflowStore.get(context.workflowId());
550 if (workflow == null) {
551 log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
552 return task;
553 }
554
555 WorkflowContext latestContext = workplaceStore.getContext(context.name());
556 if (latestContext == null) {
557 log.error("execTimeoutTask: Invalid workflow context {}", context.name());
558 return task;
559 }
560
561 try {
562 Worklet worklet = workflow.getWorkletInstance(task.workletType());
563 if (!Objects.equals(latestContext.current(), worklet.tag())) {
564 log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
565 latestContext.current(), worklet.tag());
566 return task;
567 }
568
569 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
570 log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
571 return task;
572 }
573
574 initWorkletExecution(latestContext);
575
576 worklet.timeout(latestContext);
577 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
578
579 } catch (WorkflowException e) {
580 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
581 latestContext.setCause(e.getMessage());
582 latestContext.setState(WorkflowState.EXCEPTION);
583 workplaceStore.commitContext(latestContext.name(), latestContext, false);
584 }
585
586 return task;
587 }
588
589 /**
590 * Class for delegation of workflow batch execution.
591 */
592 private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
593 @Override
594 public void execute(Collection<WorkflowData> operations) {
595 log.debug("Execute {} operation(s).", operations.size());
596
597 CompletableFuture.runAsync(() -> {
598 List<CompletableFuture<WorkflowData>> futures = operations.stream()
599 .map(
600 x -> CompletableFuture.completedFuture(x)
601 .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
602 .exceptionally(e -> null)
603 )
604 .collect(Collectors.toList());
605
606 // waiting the completion of futures
607 futures.parallelStream().forEach(x -> x.join());
608
609 }, workflowBatchExecutor).exceptionally(e -> {
610 log.error("Error submitting batches:", e);
611 return null;
612 }).thenRun(workflowAccumulator::ready);
613 }
614 }
615
616 /**
617 * Executes workflow.
618 * @param dataModelContainer workflow data model container(workflow or workplace)
619 * @return
620 */
621 private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
622 if (dataModelContainer instanceof WorkflowContext) {
623 return execWorkflowContext((WorkflowContext) dataModelContainer);
624 } else if (dataModelContainer instanceof Workplace) {
625 return execWorkplace((Workplace) dataModelContainer);
626 } else {
627 log.error("Invalid context {}", dataModelContainer);
628 return null;
629 }
630 }
631
632 /**
633 * Executes workflow context.
634 * @param context workflow context
635 * @return workflow context
636 */
637 private WorkflowContext execWorkflowContext(WorkflowContext context) {
638
639 Workflow workflow = workflowStore.get(context.workflowId());
640 if (workflow == null) {
641 log.error("Invalid workflow {}", context.workflowId());
642 return null;
643 }
644
645 final WorkflowContext latestContext = workplaceStore.getContext(context.name());
646 if (latestContext == null) {
647 log.error("Invalid workflow context {}", context.name());
648 return null;
649 }
650
651 initWorkletExecution(latestContext);
652
653 try {
654 final Worklet worklet = workflow.next(latestContext);
655
656 if (worklet == Worklet.Common.INIT) {
657 log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
658 return latestContext;
659 }
660
661 latestContext.setCurrent(worklet);
662 if (worklet == Worklet.Common.COMPLETED) {
663
664 if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
665 workplaceStore.removeContext(latestContext.name());
666 return null;
667 } else {
668 latestContext.setState(WorkflowState.IDLE);
669 workplaceStore.commitContext(latestContext.name(), latestContext, false);
670 return latestContext;
671 }
672 }
673
674 log.info("execWorkflowContext.process:{}, {}", worklet.tag(), latestContext);
675 worklet.process(latestContext);
676
677 if (latestContext.completionEventType() != null) {
678 if (latestContext.completionEventGenerator() == null) {
679 String msg = String.format("Invalid exepecting event(%s), generator(%s)",
680 latestContext.completionEventType(),
681 latestContext.completionEventGenerator());
682 throw new WorkflowException(msg);
683 }
684
685 registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
686 latestContext.name(), worklet.tag());
687
688 latestContext.completionEventGenerator().apply();
689
690 if (latestContext.completionEventTimeout() != 0L) {
691 final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
692 .context(latestContext)
693 .workletType(worklet.tag())
694 .eventType(latestContext.completionEventType().getName())
695 .eventHint(latestContext.completionEventHint())
696 .build();
697 timerChain.schedule(latestContext.completionEventTimeout(),
698 () -> {
699 eventtaskAccumulator.add(eventTimeoutTask);
700 });
701 }
702 } else {
703 if (latestContext.completionEventTimeout() != 0L) {
704 final TimeoutTask timeoutTask = TimeoutTask.builder()
705 .context(latestContext)
706 .workletType(worklet.tag())
707 .build();
708
709 timerChain.schedule(latestContext.completionEventTimeout(),
710 () -> {
711 eventtaskAccumulator.add(timeoutTask);
712 });
713 }
714 }
715
716 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
717
718 } catch (WorkflowException e) {
719 log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
720 latestContext.setCause(e.getMessage());
721 latestContext.setState(WorkflowState.EXCEPTION);
722 workplaceStore.commitContext(latestContext.name(), latestContext, false);
723 }
724
725 return latestContext;
726 }
727
728 /**
729 * Execute workplace.
730 * @param workplace workplace
731 * @return workplace
732 */
733 private Workplace execWorkplace(Workplace workplace) {
734
735 return null;
736 }
737
738}