blob: b472c5f74bae51ea3bacc71bacd5a4e8cc14b127 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
jaegonkim6a7b5242018-09-12 23:09:42 +090019import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
jaegonkime0f45b52018-10-09 20:23:26 +090024import org.onosproject.store.service.StorageException;
jaegonkim6a7b5242018-09-12 23:09:42 +090025import org.onosproject.workflow.api.DefaultWorkplace;
26import org.onosproject.workflow.api.EventHintSupplier;
27import org.onosproject.workflow.api.EventTask;
jaegonkime0f45b52018-10-09 20:23:26 +090028import org.onosproject.workflow.api.JsonDataModelInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090029import org.onosproject.workflow.api.JsonDataModelTree;
jaegonkime0f45b52018-10-09 20:23:26 +090030import org.onosproject.workflow.api.ProgramCounter;
jaegonkim6a7b5242018-09-12 23:09:42 +090031import org.onosproject.workflow.api.SystemWorkflowContext;
32import org.onosproject.workflow.api.EventTimeoutTask;
33import org.onosproject.workflow.api.TimeoutTask;
34import org.onosproject.workflow.api.TimerChain;
35import org.onosproject.workflow.api.Worklet;
36import org.onosproject.workflow.api.Workflow;
37import org.onosproject.workflow.api.WorkflowContext;
38import org.onosproject.workflow.api.WorkflowData;
39import org.onosproject.workflow.api.ContextEventMapStore;
40import org.onosproject.workflow.api.WorkflowState;
41import org.onosproject.workflow.api.WorkflowStore;
42import org.onosproject.workflow.api.WorkflowBatchDelegate;
43import org.onosproject.workflow.api.WorkflowDataEvent;
44import org.onosproject.workflow.api.WorkflowDataListener;
45import org.onosproject.workflow.api.WorkflowException;
46import org.onosproject.workflow.api.HandlerTask;
47import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
48import org.onosproject.workflow.api.Workplace;
49import org.onosproject.workflow.api.WorkplaceStore;
50import org.onosproject.workflow.api.WorkplaceStoreDelegate;
51import org.onosproject.workflow.api.WorkflowExecutionService;
52import org.onosproject.event.AbstractListenerManager;
53import org.onosproject.event.Event;
54import org.onosproject.net.intent.WorkPartitionService;
Ray Milkeydf521292018-10-04 15:13:33 -070055import org.osgi.service.component.annotations.Activate;
56import org.osgi.service.component.annotations.Component;
57import org.osgi.service.component.annotations.Deactivate;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090060import org.slf4j.Logger;
61
62import java.util.Collection;
63import java.util.List;
64import java.util.Map;
65import java.util.Objects;
nitinanandf3f94c62019-02-08 10:36:39 +053066import java.util.Set;
jaegonkim6a7b5242018-09-12 23:09:42 +090067import java.util.UUID;
68import java.util.concurrent.CompletableFuture;
69import java.util.concurrent.ExecutorService;
70import java.util.concurrent.ScheduledExecutorService;
71import java.util.stream.Collectors;
72
73import static java.util.concurrent.Executors.newFixedThreadPool;
74import static java.util.concurrent.Executors.newSingleThreadExecutor;
75import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
76import static org.onlab.util.Tools.groupedThreads;
77import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
78import static org.slf4j.LoggerFactory.getLogger;
79
Ray Milkeydf521292018-10-04 15:13:33 -070080@Component(immediate = true, service = WorkflowExecutionService.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090081public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
82 implements WorkflowExecutionService {
83
84 protected static final Logger log = getLogger(WorkFlowEngine.class);
Ray Milkeydf521292018-10-04 15:13:33 -070085 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090086 protected CoreService coreService;
87
Ray Milkeydf521292018-10-04 15:13:33 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090089 protected ClusterService clusterService;
90
Ray Milkeydf521292018-10-04 15:13:33 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090092 protected LeadershipService leadershipService;
93
Ray Milkeydf521292018-10-04 15:13:33 -070094 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090095 protected WorkPartitionService partitionService;
96
Ray Milkeydf521292018-10-04 15:13:33 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090098 protected WorkplaceStore workplaceStore;
99
Ray Milkeydf521292018-10-04 15:13:33 -0700100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900101 protected WorkflowStore workflowStore;
102
Ray Milkeydf521292018-10-04 15:13:33 -0700103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900104 protected ContextEventMapStore eventMapStore;
105
106 private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
107
108 private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
109 private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
110
111 private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
112 private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
113
114 private ExecutorService workflowBatchExecutor;
115 private ExecutorService workflowExecutor;
116
117 private ExecutorService handlerTaskBatchExecutor;
118 private ExecutorService handlerTaskExecutor;
119
120 private static final int DEFAULT_WORKFLOW_THREADS = 12;
121 private static final int DEFAULT_EVENTTASK_THREADS = 12;
122 private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
123
124 private ScheduledExecutorService eventMapTriggerExecutor;
125
126 private TimerChain timerChain = new TimerChain();
127
jaegonkime0f45b52018-10-09 20:23:26 +0900128 private JsonDataModelInjector dataModelInjector = new JsonDataModelInjector();
129
jaegonkim6a7b5242018-09-12 23:09:42 +0900130 public static final String APPID = "org.onosproject.workflow";
131 private ApplicationId appId;
132 private NodeId localNodeId;
133
134 @Activate
135 public void activate() {
136 appId = coreService.registerApplication(APPID);
137 workplaceStore.setDelegate(workplaceStoreDelegate);
138 localNodeId = clusterService.getLocalNode().id();
139 leadershipService.runForLeadership(appId.name());
140
141 workflowBatchExecutor = newSingleThreadExecutor(
142 groupedThreads("onos/workflow", "workflow-batch", log));
143 workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
144 groupedThreads("onos/workflow-exec", "worker-%d", log));
145 handlerTaskBatchExecutor = newSingleThreadExecutor(
146 groupedThreads("onos/workflow", "handlertask-batch", log));
147 handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
148 groupedThreads("onos/handlertask-exec", "worker-%d", log));
149 eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
150 groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
151
152 (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
153 JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
154 workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
155 new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
156
157 log.info("Started");
158 }
159
160 @Deactivate
161 public void deactivate() {
162 leadershipService.withdraw(appId.name());
163 workplaceStore.unsetDelegate(workplaceStoreDelegate);
164 workflowBatchExecutor.shutdown();
165 workflowExecutor.shutdown();
166 handlerTaskBatchExecutor.shutdown();
167 handlerTaskExecutor.shutdown();
168 eventMapTriggerExecutor.shutdown();
169 log.info("Stopped");
170 }
171
172 @Override
173 public void execInitWorklet(WorkflowContext context) {
174
175 Workflow workflow = workflowStore.get(context.workflowId());
176 if (workflow == null) {
177 log.error("Invalid workflow {}", context.workflowId());
178 return;
179 }
180
181 initWorkletExecution(context);
182 try {
183 Worklet initWorklet = workflow.init(context);
184 if (initWorklet != null) {
jaegonkime0f45b52018-10-09 20:23:26 +0900185
186 log.info("{} worklet.process:{}", context.name(), initWorklet.tag());
187 log.trace("{} context: {}", context.name(), context);
188
189 dataModelInjector.inject(initWorklet, context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900190 initWorklet.process(context);
jaegonkime0f45b52018-10-09 20:23:26 +0900191 dataModelInjector.inhale(initWorklet, context);
192
193 log.info("{} worklet.process(done): {}", context.name(), initWorklet.tag());
194 log.trace("{} context: {}", context.name(), context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900195 }
196
197 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900198 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900199 context.setCause(e.getMessage());
200 context.setState(WorkflowState.EXCEPTION);
201 workplaceStore.commitContext(context.name(), context, false);
202 return;
203 }
204 // trigger the execution of next worklet.
205 workplaceStore.registerContext(context.name(), context);
206 }
207
208 @Override
jaegonkime0f45b52018-10-09 20:23:26 +0900209 public void eval(String contextName) {
210
211 final WorkflowContext latestContext = workplaceStore.getContext(contextName);
212 if (latestContext == null) {
213 log.error("Invalid workflow context {}", contextName);
214 return;
215 }
216
217 initWorkletExecution(latestContext);
218
219 workplaceStore.commitContext(latestContext.name(), latestContext, true);
220 }
221
222 @Override
jaegonkim6a7b5242018-09-12 23:09:42 +0900223 public void eventMapTrigger(Event event, EventHintSupplier supplier) {
224
225 if (event.subject() instanceof SystemWorkflowContext) {
226 return;
227 }
228
229 Map<String, String> eventMap;
230
231 String eventHint;
232 try {
233 eventHint = supplier.apply(event);
234 } catch (Throwable e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900235 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900236 return;
237 }
238 if (eventHint == null) {
239 // do nothing
240 log.error("Invalid eventHint, event: {}", event);
241 return;
242 }
243
244 try {
nitinanandf3f94c62019-02-08 10:36:39 +0530245 eventMap = eventMapStore.getEventMapByHint(event.getClass().getName(), eventHint);
jaegonkim6a7b5242018-09-12 23:09:42 +0900246 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900247 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900248 return;
249 }
250
251 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
252 // do nothing;
253 log.debug("Invalid eventMap, event: {}", event);
254 return;
255 }
256
257 for (Map.Entry<String, String> entry : eventMap.entrySet()) {
258 String contextName = entry.getKey();
jaegonkime0f45b52018-10-09 20:23:26 +0900259 String strProgramCounter = entry.getValue();
260 ProgramCounter pc;
261 try {
262 pc = ProgramCounter.valueOf(strProgramCounter);
263 } catch (IllegalArgumentException e) {
264 log.error("Exception: ", e);
265 return;
266 }
267
jaegonkim6a7b5242018-09-12 23:09:42 +0900268 WorkflowContext context = workplaceStore.getContext(contextName);
269 if (Objects.isNull(context)) {
270 log.info("Invalid context: {}, event: {}", contextName, event);
271 continue;
272 }
jaegonkime0f45b52018-10-09 20:23:26 +0900273 EventTask eventtask = null;
274 try {
275 eventtask = EventTask.builder()
jaegonkim6a7b5242018-09-12 23:09:42 +0900276 .event(event)
277 .eventHint(eventHint)
278 .context(context)
jaegonkime0f45b52018-10-09 20:23:26 +0900279 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900280 .build();
jaegonkime0f45b52018-10-09 20:23:26 +0900281 } catch (WorkflowException e) {
282 log.error("Exception: ", e);
283 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900284
jaegonkime0f45b52018-10-09 20:23:26 +0900285 log.debug("eventtaskAccumulator.add: task: {}", eventtask);
286 if (!Objects.isNull(eventtask)) {
287 eventtaskAccumulator.add(eventtask);
288 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900289 }
290 }
291
292 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530293 public void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
jaegonkime0f45b52018-10-09 20:23:26 +0900294 String contextName, String programCounterString) throws WorkflowException {
nitinanandf3f94c62019-02-08 10:36:39 +0530295 eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounterString);
296 for (String eventHint : eventHintSet) {
297 for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
298 Map<String, String> eventMap = eventMapStore.getEventMapByHint(eventType.getName(), eventHint);
299 if (eventMap != null && eventMap.containsKey(contextName)) {
300 break;
301 }
302 try {
303 log.info("sleep {}", i);
304 Thread.sleep(10L * (i + 1));
305 } catch (InterruptedException e) {
306 log.error("Exception: ", e);
307 Thread.currentThread().interrupt();
308 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900309 }
310 }
nitinanandf3f94c62019-02-08 10:36:39 +0530311
jaegonkim6a7b5242018-09-12 23:09:42 +0900312 }
313
314 @Override
315 protected void post(WorkflowDataEvent event) {
316
317 if (event.subject() == null || !isRelevant(event.subject())) {
318 log.debug("ignore event {}", event);
319 return;
320 }
321
322 // trigger next worklet selection
323 WorkflowData dataModelContainer = event.subject();
324 switch (event.type()) {
325 case INSERT:
326 case UPDATE:
327 if (dataModelContainer.triggerNext()) {
jaegonkime0f45b52018-10-09 20:23:26 +0900328 log.debug("workflowAccumulator.add: {}", dataModelContainer);
jaegonkim6a7b5242018-09-12 23:09:42 +0900329 workflowAccumulator.add(dataModelContainer);
330 } else {
331 log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
332 }
333 break;
334 case REMOVE:
335 break;
336 default:
337 }
338
339 // trigger EventTask for WorkflowDataEvent
340 eventMapTriggerExecutor.submit(
341 () -> eventMapTrigger(
342 event,
343 // event hint supplier
344 (ev) -> {
345 if (ev == null || ev.subject() == null) {
346 return null;
347 }
348
349 if (ev.subject() instanceof WorkflowData) {
350 return ((WorkflowData) ev.subject()).name();
351 } else {
352 return null;
353 }
354 }
355 )
356 );
357 }
358
359 /**
360 * Checks whether this workflow data job is relevant to this ONOS node.
361 * @param job workflow data
362 * @return checking result
363 */
364 private boolean isRelevant(WorkflowData job) {
365 // distributes event processing with work-partition
366 return partitionService.isMine(job.distributor(), this::stringHash);
367 }
368
369 /**
370 * Gets hash of the string.
371 * @param str string to get a hash
372 * @return hash value
373 */
374 public Long stringHash(String str) {
375 return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
376 }
377
378 /**
379 * Class for handler task batch delegation.
380 */
381 private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
382 @Override
383 public void execute(Collection<Collection<HandlerTask>> operations) {
384 log.debug("Execute {} operation(s).", operations.size());
385
386 CompletableFuture.runAsync(() -> {
387 List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
388 .map(
389 x -> CompletableFuture.completedFuture(x)
390 .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
391 .exceptionally(e -> null)
392 )
393 .collect(Collectors.toList());
394
395 // waiting the completion of futures
396 futures.parallelStream().forEach(x -> x.join());
397
398 }, handlerTaskBatchExecutor).exceptionally(e -> {
399 log.error("Error submitting batches:", e);
400 return null;
401 }).thenRun(eventtaskAccumulator::ready);
402 }
403 }
404
405 /**
406 * Initializes worklet execution.
407 * @param context workflow context
408 */
409 private void initWorkletExecution(WorkflowContext context) {
410 context.setState(WorkflowState.RUNNING);
411 context.setCause("");
412 context.setWorkflowExecutionService(this);
413 context.setWorkflowStore(workflowStore);
414 context.setWorkplaceStore(workplaceStore);
415 context.waitCompletion(null, null, null, 0L);
416 context.setTriggerNext(false);
417 }
418
419 /**
420 * Processes handler tasks.
421 * @param tasks handler tasks
422 * @return handler tasks processed
423 */
424 private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
425
426 for (HandlerTask task : tasks) {
427 if (task instanceof EventTimeoutTask) {
428 execEventTimeoutTask((EventTimeoutTask) task);
429 } else if (task instanceof TimeoutTask) {
430 execTimeoutTask((TimeoutTask) task);
431 } else if (task instanceof EventTask) {
432 execEventTask((EventTask) task);
433 } else {
434 log.error("Unsupported handler task {}", task);
435 }
436 }
437
438 return null;
439 }
440
441 /**
442 * Executes event task.
443 * @param task event task
444 * @return event task
445 */
446 private EventTask execEventTask(EventTask task) {
447
nitinanandf3f94c62019-02-08 10:36:39 +0530448 if (!eventMapStore.isEventMapPresent(task.context().name())) {
449 log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900450 return task;
451 }
452
453 log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
454
455 WorkflowContext context = (WorkflowContext) (task.context());
456 Workflow workflow = workflowStore.get(context.workflowId());
457 if (workflow == null) {
458 log.error("Invalid workflow {}", context.workflowId());
459 return task;
460 }
461
462 WorkflowContext latestContext = workplaceStore.getContext(context.name());
463 if (latestContext == null) {
464 log.error("Invalid workflow context {}", context.name());
465 return task;
466 }
467
468 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900469 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900470 log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900471 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900472 return task;
473 }
474
jaegonkime0f45b52018-10-09 20:23:26 +0900475 Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
476 if (Worklet.Common.COMPLETED.equals(worklet) || Worklet.Common.INIT.equals(worklet)) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900477 log.error("Current worklet is {}, Ignored", worklet);
478 return task;
479 }
480
481 initWorkletExecution(latestContext);
482
jaegonkime0f45b52018-10-09 20:23:26 +0900483 log.info("{} worklet.isCompleted:{}", latestContext.name(), worklet.tag());
484 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
485
486 dataModelInjector.inject(worklet, latestContext);
487 boolean completed = worklet.isCompleted(latestContext, task.event());
488 dataModelInjector.inhale(worklet, latestContext);
489
490 if (completed) {
491
492 log.info("{} worklet.isCompleted(true):{}", latestContext.name(), worklet.tag());
493 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
494
jaegonkim6a7b5242018-09-12 23:09:42 +0900495 eventMapStore.unregisterEventMap(
nitinanandf3f94c62019-02-08 10:36:39 +0530496 task.eventType(), latestContext.name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900497
jaegonkime0f45b52018-10-09 20:23:26 +0900498 //completed case
nitinanandf3f94c62019-02-08 10:36:39 +0530499 //increase program counter
jaegonkime0f45b52018-10-09 20:23:26 +0900500 ProgramCounter pc = latestContext.current();
501 latestContext.setCurrent(workflow.increased(pc));
502
jaegonkim6a7b5242018-09-12 23:09:42 +0900503 workplaceStore.commitContext(latestContext.name(), latestContext, true);
504 return null;
505 } else {
jaegonkime0f45b52018-10-09 20:23:26 +0900506
507 log.info("{} worklet.isCompleted(false):{}", latestContext.name(), worklet.tag());
508 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
509
jaegonkim6a7b5242018-09-12 23:09:42 +0900510 workplaceStore.commitContext(latestContext.name(), latestContext, false);
511 }
512
513 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900514 log.error("Exception: ", e);
515 latestContext.setCause(e.getMessage());
516 latestContext.setState(WorkflowState.EXCEPTION);
517 workplaceStore.commitContext(latestContext.name(), latestContext, false);
518 } catch (StorageException e) {
519 log.error("Exception: ", e);
520 // StorageException does not commit context.
521 } catch (Exception e) {
522 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900523 latestContext.setCause(e.getMessage());
524 latestContext.setState(WorkflowState.EXCEPTION);
525 workplaceStore.commitContext(latestContext.name(), latestContext, false);
526 }
527
528 return task;
529 }
530
531 /**
532 * Executes event timeout task.
533 * @param task event timeout task
534 * @return handler task
535 */
536 private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
537
nitinanandf3f94c62019-02-08 10:36:39 +0530538 if (!eventMapStore.isEventMapPresent(task.context().name())) {
539 log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900540 return task;
541 }
542
543 log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
544
jaegonkime0f45b52018-10-09 20:23:26 +0900545 WorkflowContext context = (WorkflowContext) (task.context());
jaegonkim6a7b5242018-09-12 23:09:42 +0900546 Workflow workflow = workflowStore.get(context.workflowId());
547 if (workflow == null) {
548 log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
549 return task;
550 }
551
552 WorkflowContext latestContext = workplaceStore.getContext(context.name());
553 if (latestContext == null) {
554 log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
555 return task;
556 }
557
558 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900559 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900560 log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900561 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900562 return task;
563 }
564
jaegonkime0f45b52018-10-09 20:23:26 +0900565 Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
jaegonkim6a7b5242018-09-12 23:09:42 +0900566 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
567 log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
568 return task;
569 }
570
571 initWorkletExecution(latestContext);
572
nitinanandf3f94c62019-02-08 10:36:39 +0530573 eventMapStore.unregisterEventMap(task.eventType(), latestContext.name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900574
jaegonkime0f45b52018-10-09 20:23:26 +0900575 log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
576 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
577
578 dataModelInjector.inject(worklet, latestContext);
jaegonkim6a7b5242018-09-12 23:09:42 +0900579 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900580 dataModelInjector.inhale(worklet, latestContext);
581
582 log.info("{} worklet.timeout(for event)(done):{}", latestContext.name(), worklet.tag());
583 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
584
585
jaegonkim6a7b5242018-09-12 23:09:42 +0900586 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
587
588 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900589 log.error("Exception: ", e);
590 latestContext.setCause(e.getMessage());
591 latestContext.setState(WorkflowState.EXCEPTION);
592 workplaceStore.commitContext(latestContext.name(), latestContext, false);
593 } catch (StorageException e) {
594 log.error("Exception: ", e);
595 // StorageException does not commit context.
596 } catch (Exception e) {
597 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900598 latestContext.setCause(e.getMessage());
599 latestContext.setState(WorkflowState.EXCEPTION);
600 workplaceStore.commitContext(latestContext.name(), latestContext, false);
601 }
602
603 return task;
604 }
605
606 /**
607 * Executes timeout task.
608 * @param task time out task
609 * @return handler task
610 */
611 private HandlerTask execTimeoutTask(TimeoutTask task) {
612
613 log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
614
615 WorkflowContext context = (WorkflowContext) (task.context());
616 Workflow workflow = workflowStore.get(context.workflowId());
617 if (workflow == null) {
618 log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
619 return task;
620 }
621
622 WorkflowContext latestContext = workplaceStore.getContext(context.name());
623 if (latestContext == null) {
624 log.error("execTimeoutTask: Invalid workflow context {}", context.name());
625 return task;
626 }
627
628 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900629 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900630 log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900631 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900632 return task;
633 }
634
jaegonkime0f45b52018-10-09 20:23:26 +0900635 Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
jaegonkim6a7b5242018-09-12 23:09:42 +0900636 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
637 log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
638 return task;
639 }
640
641 initWorkletExecution(latestContext);
642
jaegonkime0f45b52018-10-09 20:23:26 +0900643 log.info("{} worklet.timeout:{}", latestContext.name(), worklet.tag());
644 log.trace("{} context: {}", latestContext.name(), latestContext);
645
646 dataModelInjector.inject(worklet, latestContext);
jaegonkim6a7b5242018-09-12 23:09:42 +0900647 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900648 dataModelInjector.inhale(worklet, latestContext);
649
650 log.info("{} worklet.timeout(done):{}", latestContext.name(), worklet.tag());
651 log.trace("{} context: {}", latestContext.name(), latestContext);
652
jaegonkim6a7b5242018-09-12 23:09:42 +0900653 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
654
655 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900656 log.error("Exception: ", e);
657 latestContext.setCause(e.getMessage());
658 latestContext.setState(WorkflowState.EXCEPTION);
659 workplaceStore.commitContext(latestContext.name(), latestContext, false);
660 } catch (StorageException e) {
661 log.error("Exception: ", e);
662 // StorageException does not commit context.
663 } catch (Exception e) {
664 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900665 latestContext.setCause(e.getMessage());
666 latestContext.setState(WorkflowState.EXCEPTION);
667 workplaceStore.commitContext(latestContext.name(), latestContext, false);
668 }
669
670 return task;
671 }
672
673 /**
674 * Class for delegation of workflow batch execution.
675 */
676 private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
677 @Override
678 public void execute(Collection<WorkflowData> operations) {
679 log.debug("Execute {} operation(s).", operations.size());
680
681 CompletableFuture.runAsync(() -> {
682 List<CompletableFuture<WorkflowData>> futures = operations.stream()
683 .map(
684 x -> CompletableFuture.completedFuture(x)
685 .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
686 .exceptionally(e -> null)
687 )
688 .collect(Collectors.toList());
689
690 // waiting the completion of futures
691 futures.parallelStream().forEach(x -> x.join());
692
693 }, workflowBatchExecutor).exceptionally(e -> {
694 log.error("Error submitting batches:", e);
695 return null;
696 }).thenRun(workflowAccumulator::ready);
697 }
698 }
699
700 /**
701 * Executes workflow.
702 * @param dataModelContainer workflow data model container(workflow or workplace)
703 * @return
704 */
705 private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
706 if (dataModelContainer instanceof WorkflowContext) {
707 return execWorkflowContext((WorkflowContext) dataModelContainer);
708 } else if (dataModelContainer instanceof Workplace) {
709 return execWorkplace((Workplace) dataModelContainer);
710 } else {
711 log.error("Invalid context {}", dataModelContainer);
712 return null;
713 }
714 }
715
716 /**
717 * Executes workflow context.
718 * @param context workflow context
719 * @return workflow context
720 */
721 private WorkflowContext execWorkflowContext(WorkflowContext context) {
722
723 Workflow workflow = workflowStore.get(context.workflowId());
724 if (workflow == null) {
725 log.error("Invalid workflow {}", context.workflowId());
726 return null;
727 }
728
729 final WorkflowContext latestContext = workplaceStore.getContext(context.name());
730 if (latestContext == null) {
731 log.error("Invalid workflow context {}", context.name());
732 return null;
733 }
734
735 initWorkletExecution(latestContext);
736
737 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900738 final ProgramCounter pc = workflow.next(latestContext);
739 final Worklet worklet = workflow.getWorkletInstance(pc.workletType());
jaegonkim6a7b5242018-09-12 23:09:42 +0900740
741 if (worklet == Worklet.Common.INIT) {
742 log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
743 return latestContext;
744 }
745
jaegonkime0f45b52018-10-09 20:23:26 +0900746 latestContext.setCurrent(pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900747 if (worklet == Worklet.Common.COMPLETED) {
748
749 if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
750 workplaceStore.removeContext(latestContext.name());
751 return null;
752 } else {
753 latestContext.setState(WorkflowState.IDLE);
754 workplaceStore.commitContext(latestContext.name(), latestContext, false);
755 return latestContext;
756 }
757 }
758
jaegonkime0f45b52018-10-09 20:23:26 +0900759 log.info("{} worklet.process:{}", latestContext.name(), worklet.tag());
760 log.trace("{} context: {}", latestContext.name(), latestContext);
761
762 dataModelInjector.inject(worklet, latestContext);
jaegonkim6a7b5242018-09-12 23:09:42 +0900763 worklet.process(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900764 dataModelInjector.inhale(worklet, latestContext);
765
766 log.info("{} worklet.process(done): {}", latestContext.name(), worklet.tag());
767 log.trace("{} context: {}", latestContext.name(), latestContext);
768
jaegonkim6a7b5242018-09-12 23:09:42 +0900769
770 if (latestContext.completionEventType() != null) {
771 if (latestContext.completionEventGenerator() == null) {
772 String msg = String.format("Invalid exepecting event(%s), generator(%s)",
773 latestContext.completionEventType(),
774 latestContext.completionEventGenerator());
775 throw new WorkflowException(msg);
776 }
777
nitinanandf3f94c62019-02-08 10:36:39 +0530778 registerEventMap(latestContext.completionEventType(), latestContext.completionEventHints(),
jaegonkime0f45b52018-10-09 20:23:26 +0900779 latestContext.name(), pc.toString());
jaegonkim6a7b5242018-09-12 23:09:42 +0900780
781 latestContext.completionEventGenerator().apply();
782
783 if (latestContext.completionEventTimeout() != 0L) {
784 final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
785 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900786 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900787 .eventType(latestContext.completionEventType().getName())
nitinanandf3f94c62019-02-08 10:36:39 +0530788 .eventHintSet(latestContext.completionEventHints())
jaegonkim6a7b5242018-09-12 23:09:42 +0900789 .build();
790 timerChain.schedule(latestContext.completionEventTimeout(),
791 () -> {
792 eventtaskAccumulator.add(eventTimeoutTask);
793 });
794 }
795 } else {
796 if (latestContext.completionEventTimeout() != 0L) {
797 final TimeoutTask timeoutTask = TimeoutTask.builder()
798 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900799 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900800 .build();
801
802 timerChain.schedule(latestContext.completionEventTimeout(),
803 () -> {
804 eventtaskAccumulator.add(timeoutTask);
805 });
jaegonkime0f45b52018-10-09 20:23:26 +0900806 } else {
807 //completed case
808 // increase program counter
809 latestContext.setCurrent(workflow.increased(pc));
jaegonkim6a7b5242018-09-12 23:09:42 +0900810 }
811 }
812
813 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
814
815 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900816 log.error("Exception: ", e);
817 latestContext.setCause(e.getMessage());
818 latestContext.setState(WorkflowState.EXCEPTION);
819 workplaceStore.commitContext(latestContext.name(), latestContext, false);
820 } catch (StorageException e) {
821 log.error("Exception: ", e);
822 // StorageException does not commit context.
823 } catch (Exception e) {
824 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900825 latestContext.setCause(e.getMessage());
826 latestContext.setState(WorkflowState.EXCEPTION);
827 workplaceStore.commitContext(latestContext.name(), latestContext, false);
828 }
829
830 return latestContext;
831 }
832
833 /**
834 * Execute workplace.
835 * @param workplace workplace
836 * @return workplace
837 */
838 private Workplace execWorkplace(Workplace workplace) {
839
840 return null;
841 }
842
jaegonkime0f45b52018-10-09 20:23:26 +0900843}