blob: 8b99bb8a8f8e99c349fbbc4083f142d15f0c372c [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
jaegonkim6a7b5242018-09-12 23:09:42 +090019import org.onosproject.cluster.ClusterService;
20import org.onosproject.cluster.LeadershipService;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.core.ApplicationId;
23import org.onosproject.core.CoreService;
jaegonkime0f45b52018-10-09 20:23:26 +090024import org.onosproject.store.service.StorageException;
jaegonkim6a7b5242018-09-12 23:09:42 +090025import org.onosproject.workflow.api.DefaultWorkplace;
26import org.onosproject.workflow.api.EventHintSupplier;
27import org.onosproject.workflow.api.EventTask;
jaegonkime0f45b52018-10-09 20:23:26 +090028import org.onosproject.workflow.api.JsonDataModelInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090029import org.onosproject.workflow.api.JsonDataModelTree;
jaegonkime0f45b52018-10-09 20:23:26 +090030import org.onosproject.workflow.api.ProgramCounter;
jaegonkim6a7b5242018-09-12 23:09:42 +090031import org.onosproject.workflow.api.SystemWorkflowContext;
32import org.onosproject.workflow.api.EventTimeoutTask;
33import org.onosproject.workflow.api.TimeoutTask;
34import org.onosproject.workflow.api.TimerChain;
35import org.onosproject.workflow.api.Worklet;
36import org.onosproject.workflow.api.Workflow;
37import org.onosproject.workflow.api.WorkflowContext;
38import org.onosproject.workflow.api.WorkflowData;
39import org.onosproject.workflow.api.ContextEventMapStore;
40import org.onosproject.workflow.api.WorkflowState;
41import org.onosproject.workflow.api.WorkflowStore;
42import org.onosproject.workflow.api.WorkflowBatchDelegate;
43import org.onosproject.workflow.api.WorkflowDataEvent;
44import org.onosproject.workflow.api.WorkflowDataListener;
45import org.onosproject.workflow.api.WorkflowException;
46import org.onosproject.workflow.api.HandlerTask;
47import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
48import org.onosproject.workflow.api.Workplace;
49import org.onosproject.workflow.api.WorkplaceStore;
50import org.onosproject.workflow.api.WorkplaceStoreDelegate;
51import org.onosproject.workflow.api.WorkflowExecutionService;
m.rahil09251882019-04-15 22:58:33 +053052import org.onosproject.workflow.api.WorkletDescription;
53import org.onosproject.workflow.api.StaticDataModelInjector;
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +053054import org.onosproject.workflow.api.WorkflowLoggerInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090055import org.onosproject.event.AbstractListenerManager;
56import org.onosproject.event.Event;
57import org.onosproject.net.intent.WorkPartitionService;
Ray Milkeydf521292018-10-04 15:13:33 -070058import org.osgi.service.component.annotations.Activate;
59import org.osgi.service.component.annotations.Component;
60import org.osgi.service.component.annotations.Deactivate;
61import org.osgi.service.component.annotations.Reference;
62import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090063import org.slf4j.Logger;
64
65import java.util.Collection;
66import java.util.List;
67import java.util.Map;
68import java.util.Objects;
nitinanandf3f94c62019-02-08 10:36:39 +053069import java.util.Set;
jaegonkim6a7b5242018-09-12 23:09:42 +090070import java.util.UUID;
71import java.util.concurrent.CompletableFuture;
72import java.util.concurrent.ExecutorService;
73import java.util.concurrent.ScheduledExecutorService;
74import java.util.stream.Collectors;
75
76import static java.util.concurrent.Executors.newFixedThreadPool;
77import static java.util.concurrent.Executors.newSingleThreadExecutor;
78import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
79import static org.onlab.util.Tools.groupedThreads;
jaegonkim566991c2020-03-08 08:52:23 +090080import static org.onosproject.workflow.api.CheckCondition.check;
jaegonkim6a7b5242018-09-12 23:09:42 +090081import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
82import static org.slf4j.LoggerFactory.getLogger;
83
Ray Milkeydf521292018-10-04 15:13:33 -070084@Component(immediate = true, service = WorkflowExecutionService.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090085public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
86 implements WorkflowExecutionService {
87
88 protected static final Logger log = getLogger(WorkFlowEngine.class);
Ray Milkeydf521292018-10-04 15:13:33 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090090 protected CoreService coreService;
91
Ray Milkeydf521292018-10-04 15:13:33 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090093 protected ClusterService clusterService;
94
Ray Milkeydf521292018-10-04 15:13:33 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090096 protected LeadershipService leadershipService;
97
Ray Milkeydf521292018-10-04 15:13:33 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090099 protected WorkPartitionService partitionService;
100
Ray Milkeydf521292018-10-04 15:13:33 -0700101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900102 protected WorkplaceStore workplaceStore;
103
Ray Milkeydf521292018-10-04 15:13:33 -0700104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900105 protected WorkflowStore workflowStore;
106
Ray Milkeydf521292018-10-04 15:13:33 -0700107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +0900108 protected ContextEventMapStore eventMapStore;
109
110 private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
111
112 private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
113 private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
114
115 private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
116 private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
117
118 private ExecutorService workflowBatchExecutor;
119 private ExecutorService workflowExecutor;
120
121 private ExecutorService handlerTaskBatchExecutor;
122 private ExecutorService handlerTaskExecutor;
123
124 private static final int DEFAULT_WORKFLOW_THREADS = 12;
125 private static final int DEFAULT_EVENTTASK_THREADS = 12;
126 private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530127 private static final String ERROR = "ERROR";
jaegonkim6a7b5242018-09-12 23:09:42 +0900128
129 private ScheduledExecutorService eventMapTriggerExecutor;
130
131 private TimerChain timerChain = new TimerChain();
132
jaegonkime0f45b52018-10-09 20:23:26 +0900133 private JsonDataModelInjector dataModelInjector = new JsonDataModelInjector();
m.rahil09251882019-04-15 22:58:33 +0530134 private StaticDataModelInjector staticDataModelInjector = new StaticDataModelInjector();
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530135 private WorkflowLoggerInjector workflowLoggerInjector = new WorkflowLoggerInjector();
jaegonkime0f45b52018-10-09 20:23:26 +0900136
jaegonkim6a7b5242018-09-12 23:09:42 +0900137 public static final String APPID = "org.onosproject.workflow";
138 private ApplicationId appId;
139 private NodeId localNodeId;
140
141 @Activate
142 public void activate() {
143 appId = coreService.registerApplication(APPID);
144 workplaceStore.setDelegate(workplaceStoreDelegate);
145 localNodeId = clusterService.getLocalNode().id();
146 leadershipService.runForLeadership(appId.name());
147
148 workflowBatchExecutor = newSingleThreadExecutor(
149 groupedThreads("onos/workflow", "workflow-batch", log));
150 workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
151 groupedThreads("onos/workflow-exec", "worker-%d", log));
152 handlerTaskBatchExecutor = newSingleThreadExecutor(
153 groupedThreads("onos/workflow", "handlertask-batch", log));
154 handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
155 groupedThreads("onos/handlertask-exec", "worker-%d", log));
156 eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
157 groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
158
159 (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
160 JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
161 workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
162 new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
163
164 log.info("Started");
165 }
166
167 @Deactivate
168 public void deactivate() {
169 leadershipService.withdraw(appId.name());
170 workplaceStore.unsetDelegate(workplaceStoreDelegate);
171 workflowBatchExecutor.shutdown();
172 workflowExecutor.shutdown();
173 handlerTaskBatchExecutor.shutdown();
174 handlerTaskExecutor.shutdown();
175 eventMapTriggerExecutor.shutdown();
176 log.info("Stopped");
177 }
178
179 @Override
180 public void execInitWorklet(WorkflowContext context) {
181
182 Workflow workflow = workflowStore.get(context.workflowId());
183 if (workflow == null) {
jaegonkimce75d3c2020-03-22 00:44:29 +0900184 log.error("Invalid workflow id:{}", context.workflowId());
jaegonkim6a7b5242018-09-12 23:09:42 +0900185 return;
186 }
187
188 initWorkletExecution(context);
189 try {
190 Worklet initWorklet = workflow.init(context);
191 if (initWorklet != null) {
jaegonkime0f45b52018-10-09 20:23:26 +0900192
193 log.info("{} worklet.process:{}", context.name(), initWorklet.tag());
194 log.trace("{} context: {}", context.name(), context);
195
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530196 workflowLoggerInjector.inject(initWorklet, context);
jaegonkime0f45b52018-10-09 20:23:26 +0900197 dataModelInjector.inject(initWorklet, context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900198 initWorklet.process(context);
jaegonkime0f45b52018-10-09 20:23:26 +0900199 dataModelInjector.inhale(initWorklet, context);
200
201 log.info("{} worklet.process(done): {}", context.name(), initWorklet.tag());
202 log.trace("{} context: {}", context.name(), context);
jaegonkim6a7b5242018-09-12 23:09:42 +0900203 }
204
jaegonkim566991c2020-03-08 08:52:23 +0900205 check(workplaceStore.getContext(context.name()) == null,
206 "Duplicated workflow context(" + context.name() + ") assignment.");
207
jaegonkim6a7b5242018-09-12 23:09:42 +0900208 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900209 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900210 context.setCause(e.getMessage());
211 context.setState(WorkflowState.EXCEPTION);
212 workplaceStore.commitContext(context.name(), context, false);
213 return;
214 }
215 // trigger the execution of next worklet.
216 workplaceStore.registerContext(context.name(), context);
217 }
218
219 @Override
jaegonkime0f45b52018-10-09 20:23:26 +0900220 public void eval(String contextName) {
221
222 final WorkflowContext latestContext = workplaceStore.getContext(contextName);
223 if (latestContext == null) {
224 log.error("Invalid workflow context {}", contextName);
225 return;
226 }
227
228 initWorkletExecution(latestContext);
229
230 workplaceStore.commitContext(latestContext.name(), latestContext, true);
nitinanandc8b70252019-04-17 15:35:43 +0530231
jaegonkime0f45b52018-10-09 20:23:26 +0900232 }
233
234 @Override
jaegonkim6a7b5242018-09-12 23:09:42 +0900235 public void eventMapTrigger(Event event, EventHintSupplier supplier) {
236
237 if (event.subject() instanceof SystemWorkflowContext) {
238 return;
239 }
240
jaegonkimce75d3c2020-03-22 00:44:29 +0900241 Map<String, String> eventMap;
jaegonkim6a7b5242018-09-12 23:09:42 +0900242
243 String eventHint;
244 try {
245 eventHint = supplier.apply(event);
246 } catch (Throwable e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900247 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900248 return;
249 }
250 if (eventHint == null) {
251 // do nothing
252 log.error("Invalid eventHint, event: {}", event);
253 return;
254 }
255
256 try {
nitinanandf3f94c62019-02-08 10:36:39 +0530257 eventMap = eventMapStore.getEventMapByHint(event.getClass().getName(), eventHint);
jaegonkimce75d3c2020-03-22 00:44:29 +0900258 } catch (WorkflowException e) {
259 log.error("Exception: ", e);
260 return;
261 }
262
263 if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
264 // do nothing;
265 log.debug("Invalid eventMap, event: {}", event);
266 return;
267 }
268
269 for (Map.Entry<String, String> entry : eventMap.entrySet()) {
270 String contextName = entry.getKey();
271 String strProgramCounter = entry.getValue();
272 ProgramCounter pc;
273 try {
274 pc = ProgramCounter.valueOf(strProgramCounter);
275 } catch (IllegalArgumentException e) {
276 log.error("Exception: ", e);
jaegonkime0f45b52018-10-09 20:23:26 +0900277 return;
278 }
279
jaegonkimce75d3c2020-03-22 00:44:29 +0900280 WorkflowContext context = workplaceStore.getContext(contextName);
281 if (Objects.isNull(context)) {
282 log.info("Invalid context: {}, event: {}", contextName, event);
283 continue;
284 }
285 EventTask eventtask = null;
286 try {
287 eventtask = EventTask.builder()
288 .event(event)
289 .eventHint(eventHint)
290 .context(context)
291 .programCounter(pc)
292 .build();
293 } catch (WorkflowException e) {
294 log.error("Exception: ", e);
jaegonkime0f45b52018-10-09 20:23:26 +0900295 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900296
jaegonkimce75d3c2020-03-22 00:44:29 +0900297 log.debug("eventtaskAccumulator.add: task: {}", eventtask);
298 if (!Objects.isNull(eventtask)) {
299 eventtaskAccumulator.add(eventtask);
300 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900301 }
302 }
303
304 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530305 public void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
jaegonkimce75d3c2020-03-22 00:44:29 +0900306 String contextName, String programCounterString) throws WorkflowException {
307 eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounterString);
nitinanandf3f94c62019-02-08 10:36:39 +0530308 for (String eventHint : eventHintSet) {
309 for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
jaegonkimce75d3c2020-03-22 00:44:29 +0900310 Map<String, String> eventMap = eventMapStore.getEventMapByHint(eventType.getName(), eventHint);
nitinanandf3f94c62019-02-08 10:36:39 +0530311 if (eventMap != null && eventMap.containsKey(contextName)) {
312 break;
313 }
314 try {
315 log.info("sleep {}", i);
316 Thread.sleep(10L * (i + 1));
317 } catch (InterruptedException e) {
318 log.error("Exception: ", e);
319 Thread.currentThread().interrupt();
320 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900321 }
322 }
nitinanandf3f94c62019-02-08 10:36:39 +0530323
jaegonkim6a7b5242018-09-12 23:09:42 +0900324 }
325
326 @Override
327 protected void post(WorkflowDataEvent event) {
328
329 if (event.subject() == null || !isRelevant(event.subject())) {
330 log.debug("ignore event {}", event);
331 return;
332 }
333
334 // trigger next worklet selection
335 WorkflowData dataModelContainer = event.subject();
336 switch (event.type()) {
337 case INSERT:
338 case UPDATE:
339 if (dataModelContainer.triggerNext()) {
jaegonkime0f45b52018-10-09 20:23:26 +0900340 log.debug("workflowAccumulator.add: {}", dataModelContainer);
jaegonkim6a7b5242018-09-12 23:09:42 +0900341 workflowAccumulator.add(dataModelContainer);
342 } else {
343 log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
344 }
345 break;
346 case REMOVE:
347 break;
348 default:
349 }
350
351 // trigger EventTask for WorkflowDataEvent
352 eventMapTriggerExecutor.submit(
353 () -> eventMapTrigger(
354 event,
355 // event hint supplier
356 (ev) -> {
357 if (ev == null || ev.subject() == null) {
358 return null;
359 }
360
361 if (ev.subject() instanceof WorkflowData) {
362 return ((WorkflowData) ev.subject()).name();
363 } else {
364 return null;
365 }
366 }
367 )
368 );
369 }
370
371 /**
372 * Checks whether this workflow data job is relevant to this ONOS node.
m.rahil09251882019-04-15 22:58:33 +0530373 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900374 * @param job workflow data
375 * @return checking result
376 */
377 private boolean isRelevant(WorkflowData job) {
378 // distributes event processing with work-partition
379 return partitionService.isMine(job.distributor(), this::stringHash);
380 }
381
382 /**
383 * Gets hash of the string.
m.rahil09251882019-04-15 22:58:33 +0530384 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900385 * @param str string to get a hash
386 * @return hash value
387 */
388 public Long stringHash(String str) {
389 return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
390 }
391
392 /**
393 * Class for handler task batch delegation.
394 */
395 private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
396 @Override
397 public void execute(Collection<Collection<HandlerTask>> operations) {
398 log.debug("Execute {} operation(s).", operations.size());
399
400 CompletableFuture.runAsync(() -> {
401 List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
402 .map(
403 x -> CompletableFuture.completedFuture(x)
404 .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
405 .exceptionally(e -> null)
406 )
407 .collect(Collectors.toList());
408
409 // waiting the completion of futures
410 futures.parallelStream().forEach(x -> x.join());
411
412 }, handlerTaskBatchExecutor).exceptionally(e -> {
413 log.error("Error submitting batches:", e);
414 return null;
415 }).thenRun(eventtaskAccumulator::ready);
416 }
417 }
418
419 /**
420 * Initializes worklet execution.
m.rahil09251882019-04-15 22:58:33 +0530421 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900422 * @param context workflow context
423 */
424 private void initWorkletExecution(WorkflowContext context) {
425 context.setState(WorkflowState.RUNNING);
426 context.setCause("");
427 context.setWorkflowExecutionService(this);
428 context.setWorkflowStore(workflowStore);
429 context.setWorkplaceStore(workplaceStore);
430 context.waitCompletion(null, null, null, 0L);
431 context.setTriggerNext(false);
432 }
433
434 /**
435 * Processes handler tasks.
m.rahil09251882019-04-15 22:58:33 +0530436 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900437 * @param tasks handler tasks
438 * @return handler tasks processed
439 */
440 private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
441
442 for (HandlerTask task : tasks) {
443 if (task instanceof EventTimeoutTask) {
444 execEventTimeoutTask((EventTimeoutTask) task);
445 } else if (task instanceof TimeoutTask) {
446 execTimeoutTask((TimeoutTask) task);
447 } else if (task instanceof EventTask) {
448 execEventTask((EventTask) task);
449 } else {
450 log.error("Unsupported handler task {}", task);
451 }
452 }
453
454 return null;
455 }
456
457 /**
458 * Executes event task.
m.rahil09251882019-04-15 22:58:33 +0530459 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900460 * @param task event task
461 * @return event task
462 */
463 private EventTask execEventTask(EventTask task) {
464
jaegonkimce75d3c2020-03-22 00:44:29 +0900465 if (!eventMapStore.isEventMapPresent(task.context().name())) {
466 log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900467 return task;
468 }
469
470 log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
471
jaegonkimce75d3c2020-03-22 00:44:29 +0900472 WorkflowContext context = (WorkflowContext) (task.context());
jaegonkim6a7b5242018-09-12 23:09:42 +0900473 Workflow workflow = workflowStore.get(context.workflowId());
474 if (workflow == null) {
475 log.error("Invalid workflow {}", context.workflowId());
476 return task;
477 }
478
479 WorkflowContext latestContext = workplaceStore.getContext(context.name());
480 if (latestContext == null) {
481 log.error("Invalid workflow context {}", context.name());
482 return task;
483 }
484
485 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900486 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900487 log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900488 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900489 return task;
490 }
491
jaegonkimf85ee3c2019-04-21 11:10:25 +0900492 Worklet worklet = workflow.getWorkletInstance(task.programCounter());
jaegonkime0f45b52018-10-09 20:23:26 +0900493 if (Worklet.Common.COMPLETED.equals(worklet) || Worklet.Common.INIT.equals(worklet)) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900494 log.error("Current worklet is {}, Ignored", worklet);
495 return task;
496 }
497
498 initWorkletExecution(latestContext);
499
jaegonkime0f45b52018-10-09 20:23:26 +0900500 log.info("{} worklet.isCompleted:{}", latestContext.name(), worklet.tag());
501 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
502
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530503 workflowLoggerInjector.inject(worklet, latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900504 dataModelInjector.inject(worklet, latestContext);
505 boolean completed = worklet.isCompleted(latestContext, task.event());
506 dataModelInjector.inhale(worklet, latestContext);
507
508 if (completed) {
509
510 log.info("{} worklet.isCompleted(true):{}", latestContext.name(), worklet.tag());
511 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
512
jaegonkim6a7b5242018-09-12 23:09:42 +0900513 eventMapStore.unregisterEventMap(
nitinanandf3f94c62019-02-08 10:36:39 +0530514 task.eventType(), latestContext.name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900515
jaegonkime0f45b52018-10-09 20:23:26 +0900516 //completed case
nitinanandf3f94c62019-02-08 10:36:39 +0530517 //increase program counter
jaegonkime0f45b52018-10-09 20:23:26 +0900518 ProgramCounter pc = latestContext.current();
519 latestContext.setCurrent(workflow.increased(pc));
520
jaegonkim6a7b5242018-09-12 23:09:42 +0900521 workplaceStore.commitContext(latestContext.name(), latestContext, true);
522 return null;
523 } else {
jaegonkime0f45b52018-10-09 20:23:26 +0900524
525 log.info("{} worklet.isCompleted(false):{}", latestContext.name(), worklet.tag());
526 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
527
jaegonkim6a7b5242018-09-12 23:09:42 +0900528 workplaceStore.commitContext(latestContext.name(), latestContext, false);
529 }
530
531 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900532 log.error("Exception: ", e);
533 latestContext.setCause(e.getMessage());
534 latestContext.setState(WorkflowState.EXCEPTION);
535 workplaceStore.commitContext(latestContext.name(), latestContext, false);
536 } catch (StorageException e) {
537 log.error("Exception: ", e);
538 // StorageException does not commit context.
539 } catch (Exception e) {
540 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900541 latestContext.setCause(e.getMessage());
542 latestContext.setState(WorkflowState.EXCEPTION);
543 workplaceStore.commitContext(latestContext.name(), latestContext, false);
544 }
545
546 return task;
547 }
548
549 /**
550 * Executes event timeout task.
m.rahil09251882019-04-15 22:58:33 +0530551 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900552 * @param task event timeout task
553 * @return handler task
554 */
555 private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
556
nitinanandf3f94c62019-02-08 10:36:39 +0530557 if (!eventMapStore.isEventMapPresent(task.context().name())) {
558 log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900559 return task;
560 }
561
562 log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
563
jaegonkime0f45b52018-10-09 20:23:26 +0900564 WorkflowContext context = (WorkflowContext) (task.context());
jaegonkim6a7b5242018-09-12 23:09:42 +0900565 Workflow workflow = workflowStore.get(context.workflowId());
566 if (workflow == null) {
567 log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
568 return task;
569 }
570
571 WorkflowContext latestContext = workplaceStore.getContext(context.name());
572 if (latestContext == null) {
573 log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
574 return task;
575 }
576
577 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900578 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900579 log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900580 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900581 return task;
582 }
583
jaegonkimf85ee3c2019-04-21 11:10:25 +0900584 Worklet worklet = workflow.getWorkletInstance(task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900585 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
586 log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
587 return task;
588 }
589
590 initWorkletExecution(latestContext);
591
nitinanandf3f94c62019-02-08 10:36:39 +0530592 eventMapStore.unregisterEventMap(task.eventType(), latestContext.name());
jaegonkim6a7b5242018-09-12 23:09:42 +0900593
jaegonkime0f45b52018-10-09 20:23:26 +0900594 log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
595 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
596
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530597 workflowLoggerInjector.inject(worklet, latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900598 dataModelInjector.inject(worklet, latestContext);
m.rahil09251882019-04-15 22:58:33 +0530599
600 WorkletDescription workletDesc = workflow.getWorkletDesc(task.programCounter());
601 if (Objects.nonNull(workletDesc)) {
602 if (!(workletDesc.tag().equals("INIT") || workletDesc.tag().equals("COMPLETED"))) {
603 staticDataModelInjector.inject(worklet, workletDesc);
604 }
605 }
606
jaegonkim6a7b5242018-09-12 23:09:42 +0900607 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900608 dataModelInjector.inhale(worklet, latestContext);
609
610 log.info("{} worklet.timeout(for event)(done):{}", latestContext.name(), worklet.tag());
611 log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
612
613
jaegonkim6a7b5242018-09-12 23:09:42 +0900614 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
615
616 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900617 log.error("Exception: ", e);
618 latestContext.setCause(e.getMessage());
619 latestContext.setState(WorkflowState.EXCEPTION);
620 workplaceStore.commitContext(latestContext.name(), latestContext, false);
621 } catch (StorageException e) {
622 log.error("Exception: ", e);
623 // StorageException does not commit context.
624 } catch (Exception e) {
625 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900626 latestContext.setCause(e.getMessage());
627 latestContext.setState(WorkflowState.EXCEPTION);
628 workplaceStore.commitContext(latestContext.name(), latestContext, false);
629 }
630
631 return task;
632 }
633
634 /**
635 * Executes timeout task.
m.rahil09251882019-04-15 22:58:33 +0530636 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900637 * @param task time out task
638 * @return handler task
639 */
640 private HandlerTask execTimeoutTask(TimeoutTask task) {
641
642 log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
643
644 WorkflowContext context = (WorkflowContext) (task.context());
645 Workflow workflow = workflowStore.get(context.workflowId());
646 if (workflow == null) {
647 log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
648 return task;
649 }
650
651 WorkflowContext latestContext = workplaceStore.getContext(context.name());
652 if (latestContext == null) {
653 log.error("execTimeoutTask: Invalid workflow context {}", context.name());
654 return task;
655 }
656
657 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900658 if (!Objects.equals(latestContext.current(), task.programCounter())) {
jaegonkim6a7b5242018-09-12 23:09:42 +0900659 log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
jaegonkime0f45b52018-10-09 20:23:26 +0900660 latestContext.current(), task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900661 return task;
662 }
663
jaegonkimf85ee3c2019-04-21 11:10:25 +0900664 Worklet worklet = workflow.getWorkletInstance(task.programCounter());
jaegonkim6a7b5242018-09-12 23:09:42 +0900665 if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
666 log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
667 return task;
668 }
669
670 initWorkletExecution(latestContext);
671
jaegonkime0f45b52018-10-09 20:23:26 +0900672 log.info("{} worklet.timeout:{}", latestContext.name(), worklet.tag());
673 log.trace("{} context: {}", latestContext.name(), latestContext);
674
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530675 workflowLoggerInjector.inject(worklet, latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900676 dataModelInjector.inject(worklet, latestContext);
m.rahil09251882019-04-15 22:58:33 +0530677
678 WorkletDescription workletDesc = workflow.getWorkletDesc(task.programCounter());
679 if (Objects.nonNull(workletDesc)) {
680 if (!(workletDesc.tag().equals("INIT") || workletDesc.tag().equals("COMPLETED"))) {
681 staticDataModelInjector.inject(worklet, workletDesc);
682 }
683 }
684
jaegonkim6a7b5242018-09-12 23:09:42 +0900685 worklet.timeout(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900686 dataModelInjector.inhale(worklet, latestContext);
687
688 log.info("{} worklet.timeout(done):{}", latestContext.name(), worklet.tag());
689 log.trace("{} context: {}", latestContext.name(), latestContext);
690
jaegonkim6a7b5242018-09-12 23:09:42 +0900691 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
692
693 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900694 log.error("Exception: ", e);
695 latestContext.setCause(e.getMessage());
696 latestContext.setState(WorkflowState.EXCEPTION);
697 workplaceStore.commitContext(latestContext.name(), latestContext, false);
698 } catch (StorageException e) {
699 log.error("Exception: ", e);
700 // StorageException does not commit context.
701 } catch (Exception e) {
702 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900703 latestContext.setCause(e.getMessage());
704 latestContext.setState(WorkflowState.EXCEPTION);
705 workplaceStore.commitContext(latestContext.name(), latestContext, false);
706 }
707
708 return task;
709 }
710
711 /**
712 * Class for delegation of workflow batch execution.
713 */
714 private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
715 @Override
716 public void execute(Collection<WorkflowData> operations) {
717 log.debug("Execute {} operation(s).", operations.size());
718
719 CompletableFuture.runAsync(() -> {
720 List<CompletableFuture<WorkflowData>> futures = operations.stream()
721 .map(
722 x -> CompletableFuture.completedFuture(x)
m.rahil09251882019-04-15 22:58:33 +0530723 .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
724 .exceptionally(e -> null)
jaegonkim6a7b5242018-09-12 23:09:42 +0900725 )
726 .collect(Collectors.toList());
727
728 // waiting the completion of futures
729 futures.parallelStream().forEach(x -> x.join());
730
731 }, workflowBatchExecutor).exceptionally(e -> {
732 log.error("Error submitting batches:", e);
733 return null;
734 }).thenRun(workflowAccumulator::ready);
735 }
736 }
737
738 /**
739 * Executes workflow.
m.rahil09251882019-04-15 22:58:33 +0530740 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900741 * @param dataModelContainer workflow data model container(workflow or workplace)
742 * @return
743 */
744 private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
745 if (dataModelContainer instanceof WorkflowContext) {
746 return execWorkflowContext((WorkflowContext) dataModelContainer);
747 } else if (dataModelContainer instanceof Workplace) {
748 return execWorkplace((Workplace) dataModelContainer);
749 } else {
750 log.error("Invalid context {}", dataModelContainer);
751 return null;
752 }
753 }
754
755 /**
756 * Executes workflow context.
m.rahil09251882019-04-15 22:58:33 +0530757 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900758 * @param context workflow context
759 * @return workflow context
760 */
761 private WorkflowContext execWorkflowContext(WorkflowContext context) {
762
763 Workflow workflow = workflowStore.get(context.workflowId());
764 if (workflow == null) {
765 log.error("Invalid workflow {}", context.workflowId());
766 return null;
767 }
768
769 final WorkflowContext latestContext = workplaceStore.getContext(context.name());
770 if (latestContext == null) {
771 log.error("Invalid workflow context {}", context.name());
772 return null;
773 }
774
775 initWorkletExecution(latestContext);
776
777 try {
jaegonkime0f45b52018-10-09 20:23:26 +0900778 final ProgramCounter pc = workflow.next(latestContext);
jaegonkimf85ee3c2019-04-21 11:10:25 +0900779 final Worklet worklet = workflow.getWorkletInstance(pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900780
781 if (worklet == Worklet.Common.INIT) {
782 log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
783 return latestContext;
784 }
785
jaegonkime0f45b52018-10-09 20:23:26 +0900786 latestContext.setCurrent(pc);
jaegonkim6a7b5242018-09-12 23:09:42 +0900787 if (worklet == Worklet.Common.COMPLETED) {
788
789 if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
790 workplaceStore.removeContext(latestContext.name());
791 return null;
792 } else {
793 latestContext.setState(WorkflowState.IDLE);
794 workplaceStore.commitContext(latestContext.name(), latestContext, false);
795 return latestContext;
796 }
797 }
798
jaegonkime0f45b52018-10-09 20:23:26 +0900799 log.info("{} worklet.process:{}", latestContext.name(), worklet.tag());
800 log.trace("{} context: {}", latestContext.name(), latestContext);
801
m.rahil09251882019-04-15 22:58:33 +0530802
Sanjana Venkatachalamdbc5d7d2022-04-17 23:15:30 +0530803 workflowLoggerInjector.inject(worklet, latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900804 dataModelInjector.inject(worklet, latestContext);
m.rahil09251882019-04-15 22:58:33 +0530805
806 WorkletDescription workletDesc = workflow.getWorkletDesc(pc);
807 if (Objects.nonNull(workletDesc)) {
808 if (!(workletDesc.tag().equals("INIT") || workletDesc.tag().equals("COMPLETED"))) {
809 staticDataModelInjector.inject(worklet, workletDesc);
810 }
811 }
812
jaegonkim6a7b5242018-09-12 23:09:42 +0900813 worklet.process(latestContext);
jaegonkime0f45b52018-10-09 20:23:26 +0900814 dataModelInjector.inhale(worklet, latestContext);
815
816 log.info("{} worklet.process(done): {}", latestContext.name(), worklet.tag());
817 log.trace("{} context: {}", latestContext.name(), latestContext);
818
jaegonkim6a7b5242018-09-12 23:09:42 +0900819 if (latestContext.completionEventType() != null) {
820 if (latestContext.completionEventGenerator() == null) {
821 String msg = String.format("Invalid exepecting event(%s), generator(%s)",
822 latestContext.completionEventType(),
823 latestContext.completionEventGenerator());
824 throw new WorkflowException(msg);
825 }
826
nitinanandf3f94c62019-02-08 10:36:39 +0530827 registerEventMap(latestContext.completionEventType(), latestContext.completionEventHints(),
jaegonkimce75d3c2020-03-22 00:44:29 +0900828 latestContext.name(), pc.toString());
829
jaegonkim6a7b5242018-09-12 23:09:42 +0900830 latestContext.completionEventGenerator().apply();
831
832 if (latestContext.completionEventTimeout() != 0L) {
833 final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
834 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900835 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900836 .eventType(latestContext.completionEventType().getName())
nitinanandf3f94c62019-02-08 10:36:39 +0530837 .eventHintSet(latestContext.completionEventHints())
jaegonkim6a7b5242018-09-12 23:09:42 +0900838 .build();
839 timerChain.schedule(latestContext.completionEventTimeout(),
840 () -> {
841 eventtaskAccumulator.add(eventTimeoutTask);
842 });
843 }
844 } else {
845 if (latestContext.completionEventTimeout() != 0L) {
846 final TimeoutTask timeoutTask = TimeoutTask.builder()
847 .context(latestContext)
jaegonkime0f45b52018-10-09 20:23:26 +0900848 .programCounter(pc)
jaegonkim6a7b5242018-09-12 23:09:42 +0900849 .build();
850
851 timerChain.schedule(latestContext.completionEventTimeout(),
852 () -> {
853 eventtaskAccumulator.add(timeoutTask);
854 });
jaegonkime0f45b52018-10-09 20:23:26 +0900855 } else {
856 //completed case
857 // increase program counter
858 latestContext.setCurrent(workflow.increased(pc));
jaegonkim6a7b5242018-09-12 23:09:42 +0900859 }
860 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900861 workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
862
863 } catch (WorkflowException e) {
jaegonkime0f45b52018-10-09 20:23:26 +0900864 log.error("Exception: ", e);
865 latestContext.setCause(e.getMessage());
866 latestContext.setState(WorkflowState.EXCEPTION);
867 workplaceStore.commitContext(latestContext.name(), latestContext, false);
868 } catch (StorageException e) {
869 log.error("Exception: ", e);
870 // StorageException does not commit context.
871 } catch (Exception e) {
872 log.error("Exception: ", e);
jaegonkim6a7b5242018-09-12 23:09:42 +0900873 latestContext.setCause(e.getMessage());
874 latestContext.setState(WorkflowState.EXCEPTION);
875 workplaceStore.commitContext(latestContext.name(), latestContext, false);
876 }
877
878 return latestContext;
879 }
880
881 /**
882 * Execute workplace.
m.rahil09251882019-04-15 22:58:33 +0530883 *
jaegonkim6a7b5242018-09-12 23:09:42 +0900884 * @param workplace workplace
885 * @return workplace
886 */
887 private Workplace execWorkplace(Workplace workplace) {
888
889 return null;
890 }
891
jaegonkime0f45b52018-10-09 20:23:26 +0900892}