blob: d746985d14ce74c8f6f6e717867cd1db659190e3 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
jaegonkim6a7b5242018-09-12 23:09:42 +090019import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
jaegonkime0f45b52018-10-09 20:23:26 +090024import org.onosproject.store.service.StorageException;
jaegonkim6a7b5242018-09-12 23:09:42 +090025import org.onosproject.workflow.api.DefaultWorkplace;
26import org.onosproject.workflow.api.EventHintSupplier;
27import org.onosproject.workflow.api.EventTask;
jaegonkime0f45b52018-10-09 20:23:26 +090028import org.onosproject.workflow.api.JsonDataModelInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090029import org.onosproject.workflow.api.JsonDataModelTree;
jaegonkime0f45b52018-10-09 20:23:26 +090030import org.onosproject.workflow.api.ProgramCounter;
jaegonkim6a7b5242018-09-12 23:09:42 +090031import org.onosproject.workflow.api.SystemWorkflowContext;
32import org.onosproject.workflow.api.EventTimeoutTask;
33import org.onosproject.workflow.api.TimeoutTask;
34import org.onosproject.workflow.api.TimerChain;
nitinanandc8b70252019-04-17 15:35:43 +053035import org.onosproject.workflow.api.TriggerWorklet;
36import org.onosproject.workflow.api.WorkflowEventMetaData;
jaegonkim6a7b5242018-09-12 23:09:42 +090037import org.onosproject.workflow.api.Worklet;
38import org.onosproject.workflow.api.Workflow;
39import org.onosproject.workflow.api.WorkflowContext;
40import org.onosproject.workflow.api.WorkflowData;
41import org.onosproject.workflow.api.ContextEventMapStore;
42import org.onosproject.workflow.api.WorkflowState;
43import org.onosproject.workflow.api.WorkflowStore;
44import org.onosproject.workflow.api.WorkflowBatchDelegate;
45import org.onosproject.workflow.api.WorkflowDataEvent;
46import org.onosproject.workflow.api.WorkflowDataListener;
47import org.onosproject.workflow.api.WorkflowException;
48import org.onosproject.workflow.api.HandlerTask;
49import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
50import org.onosproject.workflow.api.Workplace;
51import org.onosproject.workflow.api.WorkplaceStore;
52import org.onosproject.workflow.api.WorkplaceStoreDelegate;
53import org.onosproject.workflow.api.WorkflowExecutionService;
m.rahil09251882019-04-15 22:58:33 +053054import org.onosproject.workflow.api.WorkletDescription;
55import org.onosproject.workflow.api.StaticDataModelInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090056import org.onosproject.event.AbstractListenerManager;
57import org.onosproject.event.Event;
58import org.onosproject.net.intent.WorkPartitionService;
Ray Milkeydf521292018-10-04 15:13:33 -070059import org.osgi.service.component.annotations.Activate;
60import org.osgi.service.component.annotations.Component;
61import org.osgi.service.component.annotations.Deactivate;
62import org.osgi.service.component.annotations.Reference;
63import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090064import org.slf4j.Logger;
65
66import java.util.Collection;
67import java.util.List;
68import java.util.Map;
69import java.util.Objects;
nitinanandf3f94c62019-02-08 10:36:39 +053070import java.util.Set;
jaegonkim6a7b5242018-09-12 23:09:42 +090071import java.util.UUID;
72import java.util.concurrent.CompletableFuture;
73import java.util.concurrent.ExecutorService;
74import java.util.concurrent.ScheduledExecutorService;
75import java.util.stream.Collectors;
76
77import static java.util.concurrent.Executors.newFixedThreadPool;
78import static java.util.concurrent.Executors.newSingleThreadExecutor;
79import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
80import static org.onlab.util.Tools.groupedThreads;
jaegonkim566991c2020-03-08 08:52:23 +090081import static org.onosproject.workflow.api.CheckCondition.check;
jaegonkim6a7b5242018-09-12 23:09:42 +090082import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
83import static org.slf4j.LoggerFactory.getLogger;
84
Ray Milkeydf521292018-10-04 15:13:33 -070085@Component(immediate = true, service = WorkflowExecutionService.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090086public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
87 implements WorkflowExecutionService {
88
89 protected static final Logger log = getLogger(WorkFlowEngine.class);
Ray Milkeydf521292018-10-04 15:13:33 -070090 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090091 protected CoreService coreService;
92
Ray Milkeydf521292018-10-04 15:13:33 -070093 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090094 protected ClusterService clusterService;
95
Ray Milkeydf521292018-10-04 15:13:33 -070096 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090097 protected LeadershipService leadershipService;
98
Ray Milkeydf521292018-10-04 15:13:33 -070099 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900100 protected WorkPartitionService partitionService;
101
Ray Milkeydf521292018-10-04 15:13:33 -0700102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900103 protected WorkplaceStore workplaceStore;
104
Ray Milkeydf521292018-10-04 15:13:33 -0700105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900106 protected WorkflowStore workflowStore;
107
Ray Milkeydf521292018-10-04 15:13:33 -0700108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900109 protected ContextEventMapStore eventMapStore;
110
111 private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
112
113 private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
114 private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
115
116 private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
117 private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
118
119 private ExecutorService workflowBatchExecutor;
120 private ExecutorService workflowExecutor;
121
122 private ExecutorService handlerTaskBatchExecutor;
123 private ExecutorService handlerTaskExecutor;
124
125 private static final int DEFAULT_WORKFLOW_THREADS = 12;
126 private static final int DEFAULT_EVENTTASK_THREADS = 12;
127 private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
128
129 private ScheduledExecutorService eventMapTriggerExecutor;
130
131 private TimerChain timerChain = new TimerChain();
132
jaegonkime0f45b52018-10-09 20:23:26 +0900133 private JsonDataModelInjector dataModelInjector = new JsonDataModelInjector();
m.rahil09251882019-04-15 22:58:33 +0530134 private StaticDataModelInjector staticDataModelInjector = new StaticDataModelInjector();
jaegonkime0f45b52018-10-09 20:23:26 +0900135
jaegonkim6a7b5242018-09-12 23:09:42 +0900136 public static final String APPID = "org.onosproject.workflow";
137 private ApplicationId appId;
138 private NodeId localNodeId;
139
140 @Activate
141 public void activate() {
142 appId = coreService.registerApplication(APPID);
143 workplaceStore.setDelegate(workplaceStoreDelegate);
144 localNodeId = clusterService.getLocalNode().id();
145 leadershipService.runForLeadership(appId.name());
146
147 workflowBatchExecutor = newSingleThreadExecutor(
148 groupedThreads("onos/workflow", "workflow-batch", log));
149 workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
150 groupedThreads("onos/workflow-exec", "worker-%d", log));
151 handlerTaskBatchExecutor = newSingleThreadExecutor(
152 groupedThreads("onos/workflow", "handlertask-batch", log));
153 handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
154 groupedThreads("onos/handlertask-exec", "worker-%d", log));
155 eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
156 groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
157
158 (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
159 JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
160 workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
161 new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
162
163 log.info("Started");
164 }
165
166 @Deactivate
167 public void deactivate() {
168 leadershipService.withdraw(appId.name());
169 workplaceStore.unsetDelegate(workplaceStoreDelegate);
170 workflowBatchExecutor.shutdown();
171 workflowExecutor.shutdown();
172 handlerTaskBatchExecutor.shutdown();
173 handlerTaskExecutor.shutdown();
174 eventMapTriggerExecutor.shutdown();
175 log.info("Stopped");
176 }
177
178 @Override
179 public void execInitWorklet(WorkflowContext context) {
180
181 Workflow workflow = workflowStore.get(context.workflowId());
182 if (workflow == null) {
183 log.error("Invalid workflow {}", context.workflowId());
184 return;
185 }
186
187 initWorkletExecution(context);
188 try {
189 Worklet initWorklet = workflow.init(context);
190 if (initWorklet != null) {
jaegonkime0f45b52018-10-09 20:23:26 +0900191
192 log.info("{} worklet.process:{}", context.name(), initWorklet.tag());
193 log.trace("{} context: {}", context.name(), context);
194
195 dataModelInjector.inject(initWorklet, context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900196 initWorklet.process(context);
jaegonkime0f45b52018-10-09 20:23:26 +0900197 dataModelInjector.inhale(initWorklet, context);
198
199 log.info("{} worklet.process(done): {}", context.name(), initWorklet.tag());
200 log.trace("{} context: {}", context.name(), context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900201 }
202
jaegonkim566991c2020-03-08 08:52:23 +0900203 check(workplaceStore.getContext(context.name()) == null,
204 "Duplicated workflow context(" + context.name() + ") assignment.");
205
jaegonkim6a7b5242018-09-12 23:09:42 +0900206 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900207 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900208 context.setCause(e.getMessage());
209 context.setState(WorkflowState.EXCEPTION);
210 workplaceStore.commitContext(context.name(), context, false);
211 return;
212 }
213 // trigger the execution of next worklet.
214 workplaceStore.registerContext(context.name(), context);
215 }
216
217 @Override
jaegonkime0f45b52018-10-09 20:23:26 +0900218 public void eval(String contextName) {
219
220 final WorkflowContext latestContext = workplaceStore.getContext(contextName);
221 if (latestContext == null) {
222 log.error("Invalid workflow context {}", contextName);
223 return;
224 }
225
226 initWorkletExecution(latestContext);
227
228 workplaceStore.commitContext(latestContext.name(), latestContext, true);
nitinanandc8b70252019-04-17 15:35:43 +0530229
jaegonkime0f45b52018-10-09 20:23:26 +0900230 }
231
232 @Override
jaegonkim6a7b5242018-09-12 23:09:42 +0900233 public void eventMapTrigger(Event event, EventHintSupplier supplier) {
234
235 if (event.subject() instanceof SystemWorkflowContext) {
236 return;
237 }
238
nitinanandc8b70252019-04-17 15:35:43 +0530239 Map<String, WorkflowEventMetaData> eventMap;
jaegonkim6a7b5242018-09-12 23:09:42 +0900240
241 String eventHint;
242 try {
243 eventHint = supplier.apply(event);
244 } catch (Throwable e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900245 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900246 return;
247 }
nitinanandc8b70252019-04-17 15:35:43 +0530248
jaegonkim6a7b5242018-09-12 23:09:42 +0900249 if (eventHint == null) {
250 // do nothing
251 log.error("Invalid eventHint, event: {}", event);
252 return;
253 }
254
255 try {
nitinanandf3f94c62019-02-08 10:36:39 +0530256 eventMap = eventMapStore.getEventMapByHint(event.getClass().getName(), eventHint);
nitinanandc8b70252019-04-17 15:35:43 +0530257 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
258 // do nothing;
259 log.debug("Invalid eventMap, event: {}", event);
jaegonkime0f45b52018-10-09 20:23:26 +0900260 return;
261 }
262
nitinanandc8b70252019-04-17 15:35:43 +0530263 for (Map.Entry<String, WorkflowEventMetaData> entry : eventMap.entrySet()) {
264 String contextName = entry.getKey();
265 ProgramCounter pc = ProgramCounter.valueOf("INVALID_WORKLET", 0);
266 WorkflowContext context = null;
267
268 context = workplaceStore.getContext(contextName);
269
270 if (Objects.isNull(context)) {
271 log.info("Invalid context: {}, event: {}", contextName, event);
272 continue;
273 }
274
275 EventTask eventtask = null;
276 if (eventMapStore.isTriggerSet(event.getClass().getName(), eventHint, contextName)) {
277 try {
278 eventtask = EventTask.builder()
279 .event(event)
280 .eventHint(eventHint)
281 .context(context)
282 .programCounter(pc)
283 .build();
284 } catch (WorkflowException e) {
285 log.error("Exception: ", e);
286 }
287
288 log.debug("eventtaskAccumulator.add: task: {}", eventtask);
289 if (!Objects.isNull(eventtask)) {
290 eventtaskAccumulator.add(eventtask);
291 }
292 }
293 /*Both type of event is being scheduled here if applicable.
294 If worfklow trigger event is set but may not be a valid one for current event type,
295 then normal worklet event should be processed if applicable. But validity of workflow
296 trigger event would be checked later, so as of now both kind of event would be scheduled.
297 later while trigger event processing, its validity is found to be true, then worklet events
298 would be unregistered and eventually these events wont be processed.*/
299 if (eventMapStore.isEventMapPresent(contextName)) {
300 try {
301 pc = entry.getValue().getProgramCounter();
302 } catch (IllegalArgumentException e) {
303 log.error("Exception: ", e);
304 continue;
305 }
306 try {
307 eventtask = EventTask.builder()
308 .event(event)
309 .eventHint(eventHint)
310 .context(context)
311 .programCounter(pc)
312 .build();
313 } catch (WorkflowException e) {
314 log.error("Exception: ", e);
315 }
316
317 log.debug("eventtaskAccumulator.add: task: {}", eventtask);
318 if (!Objects.isNull(eventtask)) {
319 eventtaskAccumulator.add(eventtask);
320 }
321
322 }
jaegonkime0f45b52018-10-09 20:23:26 +0900323 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900324
nitinanandc8b70252019-04-17 15:35:43 +0530325 } catch (WorkflowException we) {
326 log.error("Exception {} occured in fetching contexts for trigger event {}", we, event);
jaegonkim6a7b5242018-09-12 23:09:42 +0900327 }
328 }
329
330 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530331 public void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
nitinanandc8b70252019-04-17 15:35:43 +0530332 String contextName, ProgramCounter programCounter) throws WorkflowException {
333 eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounter);
nitinanandf3f94c62019-02-08 10:36:39 +0530334 for (String eventHint : eventHintSet) {
335 for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
nitinanandc8b70252019-04-17 15:35:43 +0530336 Map<String, WorkflowEventMetaData> eventMap =
337 eventMapStore.getEventMapByHint(eventType.getName(), eventHint);
nitinanandf3f94c62019-02-08 10:36:39 +0530338 if (eventMap != null && eventMap.containsKey(contextName)) {
339 break;
340 }
341 try {
342 log.info("sleep {}", i);
343 Thread.sleep(10L * (i + 1));
344 } catch (InterruptedException e) {
345 log.error("Exception: ", e);
346 Thread.currentThread().interrupt();
347 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900348 }
349 }
nitinanandf3f94c62019-02-08 10:36:39 +0530350
jaegonkim6a7b5242018-09-12 23:09:42 +0900351 }
352
353 @Override
354 protected void post(WorkflowDataEvent event) {
355
356 if (event.subject() == null || !isRelevant(event.subject())) {
357 log.debug("ignore event {}", event);
358 return;
359 }
360
361 // trigger next worklet selection
362 WorkflowData dataModelContainer = event.subject();
363 switch (event.type()) {
364 case INSERT:
365 case UPDATE:
366 if (dataModelContainer.triggerNext()) {
jaegonkime0f45b52018-10-09 20:23:26 +0900367 log.debug("workflowAccumulator.add: {}", dataModelContainer);
jaegonkim6a7b5242018-09-12 23:09:42 +0900368 workflowAccumulator.add(dataModelContainer);
369 } else {
370 log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
371 }
372 break;
373 case REMOVE:
374 break;
375 default:
376 }
377
378 // trigger EventTask for WorkflowDataEvent
379 eventMapTriggerExecutor.submit(
380 () -> eventMapTrigger(
381 event,
382 // event hint supplier
383 (ev) -> {
384 if (ev == null || ev.subject() == null) {
385 return null;
386 }
387
388 if (ev.subject() instanceof WorkflowData) {
389 return ((WorkflowData) ev.subject()).name();
390 } else {
391 return null;
392 }
393 }
394 )
395 );
396 }
397
398 /**
399 * Checks whether this workflow data job is relevant to this ONOS node.
m.rahil09251882019-04-15 22:58:33 +0530400 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900401 * @param job workflow data
402 * @return checking result
403 */
404 private boolean isRelevant(WorkflowData job) {
405 // distributes event processing with work-partition
406 return partitionService.isMine(job.distributor(), this::stringHash);
407 }
408
409 /**
410 * Gets hash of the string.
m.rahil09251882019-04-15 22:58:33 +0530411 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900412 * @param str string to get a hash
413 * @return hash value
414 */
415 public Long stringHash(String str) {
416 return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
417 }
418
419 /**
420 * Class for handler task batch delegation.
421 */
422 private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
423 @Override
424 public void execute(Collection<Collection<HandlerTask>> operations) {
425 log.debug("Execute {} operation(s).", operations.size());
426
427 CompletableFuture.runAsync(() -> {
428 List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
429 .map(
430 x -> CompletableFuture.completedFuture(x)
431 .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
432 .exceptionally(e -> null)
433 )
434 .collect(Collectors.toList());
435
436 // waiting the completion of futures
437 futures.parallelStream().forEach(x -> x.join());
438
439 }, handlerTaskBatchExecutor).exceptionally(e -> {
440 log.error("Error submitting batches:", e);
441 return null;
442 }).thenRun(eventtaskAccumulator::ready);
443 }
444 }
445
446 /**
447 * Initializes worklet execution.
m.rahil09251882019-04-15 22:58:33 +0530448 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900449 * @param context workflow context
450 */
451 private void initWorkletExecution(WorkflowContext context) {
452 context.setState(WorkflowState.RUNNING);
453 context.setCause("");
454 context.setWorkflowExecutionService(this);
455 context.setWorkflowStore(workflowStore);
456 context.setWorkplaceStore(workplaceStore);
nitinanandc8b70252019-04-17 15:35:43 +0530457 context.setEventMapStore(eventMapStore);
jaegonkim6a7b5242018-09-12 23:09:42 +0900458 context.waitCompletion(null, null, null, 0L);
459 context.setTriggerNext(false);
460 }
461
462 /**
463 * Processes handler tasks.
m.rahil09251882019-04-15 22:58:33 +0530464 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900465 * @param tasks handler tasks
466 * @return handler tasks processed
467 */
468 private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
469
470 for (HandlerTask task : tasks) {
471 if (task instanceof EventTimeoutTask) {
472 execEventTimeoutTask((EventTimeoutTask) task);
473 } else if (task instanceof TimeoutTask) {
474 execTimeoutTask((TimeoutTask) task);
475 } else if (task instanceof EventTask) {
476 execEventTask((EventTask) task);
477 } else {
478 log.error("Unsupported handler task {}", task);
479 }
480 }
481
482 return null;
483 }
484
485 /**
486 * Executes event task.
m.rahil09251882019-04-15 22:58:33 +0530487 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900488 * @param task event task
489 * @return event task
490 */
491 private EventTask execEventTask(EventTask task) {
492
nitinanandc8b70252019-04-17 15:35:43 +0530493 WorkflowContext context = (WorkflowContext) (task.context());
494 String cxtName = context.name();
495 try {
496 if (eventMapStore.isTriggerSet(task.event().getClass().getName(), task.eventHint(), cxtName)) {
497 WorkflowContext workflowContext = workplaceStore.getContext(cxtName);
498 Workflow workflow = workflowStore.get(workflowContext.workflowId());
499 String triggerWorkletName = workflow.getTriggerWorkletClassName().get();
500 Worklet worklet = workflow.getTriggerWorkletInstance(triggerWorkletName);
501 if (worklet instanceof TriggerWorklet) {
502 if (((TriggerWorklet) worklet).isTriggerValid(workflowContext, task.event())) {
503 if (Objects.nonNull(workflowContext.completionEventType())) {
504 eventMapStore.unregisterEventMap(workflowContext.completionEventType().getName(),
505 workflowContext.name());
506 }
507 initWorkletExecution(workflowContext);
508 workflowContext.setCurrent(ProgramCounter.INIT_PC);
509 workplaceStore.commitContext(cxtName, workflowContext, true);
510 }
511 }
512 }
513
514 } catch (WorkflowException we) {
515 log.error("Error Occurred in validating trigger for eventType {} eventHint {} context name {}",
516 task.eventType(), task.eventHint(), cxtName);
517 }
518
519 if (!eventMapStore.isEventMapPresent(cxtName)) {
520 log.trace("EventMap doesnt exist for taskcontext:{}", cxtName);
jaegonkim6a7b5242018-09-12 23:09:42 +0900521 return task;
522 }
523
524 log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
525
jaegonkim6a7b5242018-09-12 23:09:42 +0900526 Workflow workflow = workflowStore.get(context.workflowId());
527 if (workflow == null) {
528 log.error("Invalid workflow {}", context.workflowId());
529 return task;
530 }
531
532 WorkflowContext latestContext = workplaceStore.getContext(context.name());
533 if (latestContext == null) {
534 log.error("Invalid workflow context {}", context.name());
535 return task;
536 }
537
538 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900539 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900540 log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900541 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900542 return task;
543 }
544
jaegonkimf85ee3c2019-04-21 11:10:25 +0900545 Worklet worklet = workflow.getWorkletInstance(task.programCounter());
jaegonkime0f45b52018-10-09 20:23:26 +0900546 if (Worklet.Common.COMPLETED.equals(worklet) || Worklet.Common.INIT.equals(worklet)) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900547 log.error("Current worklet is {}, Ignored", worklet);
548 return task;
549 }
550
551 initWorkletExecution(latestContext);
552
jaegonkime0f45b52018-10-09 20:23:26 +0900553 log.info("{} worklet.isCompleted:{}", latestContext.name(), worklet.tag());
554 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
555
556 dataModelInjector.inject(worklet, latestContext);
557 boolean completed = worklet.isCompleted(latestContext, task.event());
558 dataModelInjector.inhale(worklet, latestContext);
559
560 if (completed) {
561
562 log.info("{} worklet.isCompleted(true):{}", latestContext.name(), worklet.tag());
563 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
564
jaegonkim6a7b5242018-09-12 23:09:42 +0900565 eventMapStore.unregisterEventMap(
nitinanandf3f94c62019-02-08 10:36:39 +0530566 task.eventType(), latestContext.name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900567
jaegonkime0f45b52018-10-09 20:23:26 +0900568 //completed case
nitinanandf3f94c62019-02-08 10:36:39 +0530569 //increase program counter
jaegonkime0f45b52018-10-09 20:23:26 +0900570 ProgramCounter pc = latestContext.current();
571 latestContext.setCurrent(workflow.increased(pc));
572
jaegonkim6a7b5242018-09-12 23:09:42 +0900573 workplaceStore.commitContext(latestContext.name(), latestContext, true);
574 return null;
575 } else {
jaegonkime0f45b52018-10-09 20:23:26 +0900576
577 log.info("{} worklet.isCompleted(false):{}", latestContext.name(), worklet.tag());
578 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
579
jaegonkim6a7b5242018-09-12 23:09:42 +0900580 workplaceStore.commitContext(latestContext.name(), latestContext, false);
581 }
582
583 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900584 log.error("Exception: ", e);
585 latestContext.setCause(e.getMessage());
586 latestContext.setState(WorkflowState.EXCEPTION);
587 workplaceStore.commitContext(latestContext.name(), latestContext, false);
588 } catch (StorageException e) {
589 log.error("Exception: ", e);
590 // StorageException does not commit context.
591 } catch (Exception e) {
592 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900593 latestContext.setCause(e.getMessage());
594 latestContext.setState(WorkflowState.EXCEPTION);
595 workplaceStore.commitContext(latestContext.name(), latestContext, false);
596 }
597
598 return task;
599 }
600
601 /**
602 * Executes event timeout task.
m.rahil09251882019-04-15 22:58:33 +0530603 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900604 * @param task event timeout task
605 * @return handler task
606 */
607 private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
608
nitinanandf3f94c62019-02-08 10:36:39 +0530609 if (!eventMapStore.isEventMapPresent(task.context().name())) {
610 log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900611 return task;
612 }
613
614 log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
615
jaegonkime0f45b52018-10-09 20:23:26 +0900616 WorkflowContext context = (WorkflowContext) (task.context());
jaegonkim6a7b5242018-09-12 23:09:42 +0900617 Workflow workflow = workflowStore.get(context.workflowId());
618 if (workflow == null) {
619 log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
620 return task;
621 }
622
623 WorkflowContext latestContext = workplaceStore.getContext(context.name());
624 if (latestContext == null) {
625 log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
626 return task;
627 }
628
629 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900630 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900631 log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900632 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900633 return task;
634 }
635
jaegonkimf85ee3c2019-04-21 11:10:25 +0900636 Worklet worklet = workflow.getWorkletInstance(task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900637 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
638 log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
639 return task;
640 }
641
642 initWorkletExecution(latestContext);
643
nitinanandf3f94c62019-02-08 10:36:39 +0530644 eventMapStore.unregisterEventMap(task.eventType(), latestContext.name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900645
jaegonkime0f45b52018-10-09 20:23:26 +0900646 log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
647 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
648
649 dataModelInjector.inject(worklet, latestContext);
m.rahil09251882019-04-15 22:58:33 +0530650
651 WorkletDescription workletDesc = workflow.getWorkletDesc(task.programCounter());
652 if (Objects.nonNull(workletDesc)) {
653 if (!(workletDesc.tag().equals("INIT") || workletDesc.tag().equals("COMPLETED"))) {
654 staticDataModelInjector.inject(worklet, workletDesc);
655 }
656 }
657
jaegonkim6a7b5242018-09-12 23:09:42 +0900658 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900659 dataModelInjector.inhale(worklet, latestContext);
660
661 log.info("{} worklet.timeout(for event)(done):{}", latestContext.name(), worklet.tag());
662 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
663
664
jaegonkim6a7b5242018-09-12 23:09:42 +0900665 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
666
667 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900668 log.error("Exception: ", e);
669 latestContext.setCause(e.getMessage());
670 latestContext.setState(WorkflowState.EXCEPTION);
671 workplaceStore.commitContext(latestContext.name(), latestContext, false);
672 } catch (StorageException e) {
673 log.error("Exception: ", e);
674 // StorageException does not commit context.
675 } catch (Exception e) {
676 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900677 latestContext.setCause(e.getMessage());
678 latestContext.setState(WorkflowState.EXCEPTION);
679 workplaceStore.commitContext(latestContext.name(), latestContext, false);
680 }
681
682 return task;
683 }
684
685 /**
686 * Executes timeout task.
m.rahil09251882019-04-15 22:58:33 +0530687 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900688 * @param task time out task
689 * @return handler task
690 */
691 private HandlerTask execTimeoutTask(TimeoutTask task) {
692
693 log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
694
695 WorkflowContext context = (WorkflowContext) (task.context());
696 Workflow workflow = workflowStore.get(context.workflowId());
697 if (workflow == null) {
698 log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
699 return task;
700 }
701
702 WorkflowContext latestContext = workplaceStore.getContext(context.name());
703 if (latestContext == null) {
704 log.error("execTimeoutTask: Invalid workflow context {}", context.name());
705 return task;
706 }
707
708 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900709 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900710 log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900711 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900712 return task;
713 }
714
jaegonkimf85ee3c2019-04-21 11:10:25 +0900715 Worklet worklet = workflow.getWorkletInstance(task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900716 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
717 log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
718 return task;
719 }
720
721 initWorkletExecution(latestContext);
722
jaegonkime0f45b52018-10-09 20:23:26 +0900723 log.info("{} worklet.timeout:{}", latestContext.name(), worklet.tag());
724 log.trace("{} context: {}", latestContext.name(), latestContext);
725
726 dataModelInjector.inject(worklet, latestContext);
m.rahil09251882019-04-15 22:58:33 +0530727
728 WorkletDescription workletDesc = workflow.getWorkletDesc(task.programCounter());
729 if (Objects.nonNull(workletDesc)) {
730 if (!(workletDesc.tag().equals("INIT") || workletDesc.tag().equals("COMPLETED"))) {
731 staticDataModelInjector.inject(worklet, workletDesc);
732 }
733 }
734
jaegonkim6a7b5242018-09-12 23:09:42 +0900735 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900736 dataModelInjector.inhale(worklet, latestContext);
737
738 log.info("{} worklet.timeout(done):{}", latestContext.name(), worklet.tag());
739 log.trace("{} context: {}", latestContext.name(), latestContext);
740
jaegonkim6a7b5242018-09-12 23:09:42 +0900741 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
742
743 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900744 log.error("Exception: ", e);
745 latestContext.setCause(e.getMessage());
746 latestContext.setState(WorkflowState.EXCEPTION);
747 workplaceStore.commitContext(latestContext.name(), latestContext, false);
748 } catch (StorageException e) {
749 log.error("Exception: ", e);
750 // StorageException does not commit context.
751 } catch (Exception e) {
752 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900753 latestContext.setCause(e.getMessage());
754 latestContext.setState(WorkflowState.EXCEPTION);
755 workplaceStore.commitContext(latestContext.name(), latestContext, false);
756 }
757
758 return task;
759 }
760
761 /**
762 * Class for delegation of workflow batch execution.
763 */
764 private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
765 @Override
766 public void execute(Collection<WorkflowData> operations) {
767 log.debug("Execute {} operation(s).", operations.size());
768
769 CompletableFuture.runAsync(() -> {
770 List<CompletableFuture<WorkflowData>> futures = operations.stream()
771 .map(
772 x -> CompletableFuture.completedFuture(x)
m.rahil09251882019-04-15 22:58:33 +0530773 .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
774 .exceptionally(e -> null)
jaegonkim6a7b5242018-09-12 23:09:42 +0900775 )
776 .collect(Collectors.toList());
777
778 // waiting the completion of futures
779 futures.parallelStream().forEach(x -> x.join());
780
781 }, workflowBatchExecutor).exceptionally(e -> {
782 log.error("Error submitting batches:", e);
783 return null;
784 }).thenRun(workflowAccumulator::ready);
785 }
786 }
787
788 /**
789 * Executes workflow.
m.rahil09251882019-04-15 22:58:33 +0530790 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900791 * @param dataModelContainer workflow data model container(workflow or workplace)
792 * @return
793 */
794 private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
795 if (dataModelContainer instanceof WorkflowContext) {
796 return execWorkflowContext((WorkflowContext) dataModelContainer);
797 } else if (dataModelContainer instanceof Workplace) {
798 return execWorkplace((Workplace) dataModelContainer);
799 } else {
800 log.error("Invalid context {}", dataModelContainer);
801 return null;
802 }
803 }
804
805 /**
806 * Executes workflow context.
m.rahil09251882019-04-15 22:58:33 +0530807 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900808 * @param context workflow context
809 * @return workflow context
810 */
811 private WorkflowContext execWorkflowContext(WorkflowContext context) {
812
813 Workflow workflow = workflowStore.get(context.workflowId());
814 if (workflow == null) {
815 log.error("Invalid workflow {}", context.workflowId());
816 return null;
817 }
818
819 final WorkflowContext latestContext = workplaceStore.getContext(context.name());
820 if (latestContext == null) {
821 log.error("Invalid workflow context {}", context.name());
822 return null;
823 }
824
825 initWorkletExecution(latestContext);
826
827 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900828 final ProgramCounter pc = workflow.next(latestContext);
jaegonkimf85ee3c2019-04-21 11:10:25 +0900829 final Worklet worklet = workflow.getWorkletInstance(pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900830
831 if (worklet == Worklet.Common.INIT) {
832 log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
833 return latestContext;
834 }
835
jaegonkime0f45b52018-10-09 20:23:26 +0900836 latestContext.setCurrent(pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900837 if (worklet == Worklet.Common.COMPLETED) {
838
839 if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
840 workplaceStore.removeContext(latestContext.name());
841 return null;
842 } else {
843 latestContext.setState(WorkflowState.IDLE);
844 workplaceStore.commitContext(latestContext.name(), latestContext, false);
845 return latestContext;
846 }
847 }
848
jaegonkime0f45b52018-10-09 20:23:26 +0900849 log.info("{} worklet.process:{}", latestContext.name(), worklet.tag());
850 log.trace("{} context: {}", latestContext.name(), latestContext);
851
m.rahil09251882019-04-15 22:58:33 +0530852
jaegonkime0f45b52018-10-09 20:23:26 +0900853 dataModelInjector.inject(worklet, latestContext);
m.rahil09251882019-04-15 22:58:33 +0530854
855 WorkletDescription workletDesc = workflow.getWorkletDesc(pc);
856 if (Objects.nonNull(workletDesc)) {
857 if (!(workletDesc.tag().equals("INIT") || workletDesc.tag().equals("COMPLETED"))) {
858 staticDataModelInjector.inject(worklet, workletDesc);
859 }
860 }
861
jaegonkim6a7b5242018-09-12 23:09:42 +0900862 worklet.process(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900863 dataModelInjector.inhale(worklet, latestContext);
864
865 log.info("{} worklet.process(done): {}", latestContext.name(), worklet.tag());
866 log.trace("{} context: {}", latestContext.name(), latestContext);
867
jaegonkim6a7b5242018-09-12 23:09:42 +0900868 if (latestContext.completionEventType() != null) {
869 if (latestContext.completionEventGenerator() == null) {
870 String msg = String.format("Invalid exepecting event(%s), generator(%s)",
871 latestContext.completionEventType(),
872 latestContext.completionEventGenerator());
873 throw new WorkflowException(msg);
874 }
875
nitinanandf3f94c62019-02-08 10:36:39 +0530876 registerEventMap(latestContext.completionEventType(), latestContext.completionEventHints(),
nitinanandc8b70252019-04-17 15:35:43 +0530877 latestContext.name(), pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900878 latestContext.completionEventGenerator().apply();
879
880 if (latestContext.completionEventTimeout() != 0L) {
881 final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
882 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900883 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900884 .eventType(latestContext.completionEventType().getName())
nitinanandf3f94c62019-02-08 10:36:39 +0530885 .eventHintSet(latestContext.completionEventHints())
jaegonkim6a7b5242018-09-12 23:09:42 +0900886 .build();
887 timerChain.schedule(latestContext.completionEventTimeout(),
888 () -> {
889 eventtaskAccumulator.add(eventTimeoutTask);
890 });
891 }
892 } else {
893 if (latestContext.completionEventTimeout() != 0L) {
894 final TimeoutTask timeoutTask = TimeoutTask.builder()
895 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900896 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900897 .build();
898
899 timerChain.schedule(latestContext.completionEventTimeout(),
900 () -> {
901 eventtaskAccumulator.add(timeoutTask);
902 });
jaegonkime0f45b52018-10-09 20:23:26 +0900903 } else {
904 //completed case
905 // increase program counter
906 latestContext.setCurrent(workflow.increased(pc));
jaegonkim6a7b5242018-09-12 23:09:42 +0900907 }
908 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900909 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
910
911 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900912 log.error("Exception: ", e);
913 latestContext.setCause(e.getMessage());
914 latestContext.setState(WorkflowState.EXCEPTION);
915 workplaceStore.commitContext(latestContext.name(), latestContext, false);
916 } catch (StorageException e) {
917 log.error("Exception: ", e);
918 // StorageException does not commit context.
919 } catch (Exception e) {
920 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900921 latestContext.setCause(e.getMessage());
922 latestContext.setState(WorkflowState.EXCEPTION);
923 workplaceStore.commitContext(latestContext.name(), latestContext, false);
924 }
925
926 return latestContext;
927 }
928
929 /**
930 * Execute workplace.
m.rahil09251882019-04-15 22:58:33 +0530931 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900932 * @param workplace workplace
933 * @return workplace
934 */
935 private Workplace execWorkplace(Workplace workplace) {
936
937 return null;
938 }
939
jaegonkime0f45b52018-10-09 20:23:26 +0900940}