Merge remote-tracking branch 'origin/master' into merge-master
Change-Id: I4608093c4400a313b253508ac6bc8a84ecba5c7e
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
new file mode 100644
index 0000000..15cd8cd
--- /dev/null
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
@@ -0,0 +1,738 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.workflow.impl;
+
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.google.common.collect.Lists;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.workflow.api.DefaultWorkplace;
+import org.onosproject.workflow.api.EventHintSupplier;
+import org.onosproject.workflow.api.EventTask;
+import org.onosproject.workflow.api.JsonDataModelTree;
+import org.onosproject.workflow.api.SystemWorkflowContext;
+import org.onosproject.workflow.api.EventTimeoutTask;
+import org.onosproject.workflow.api.TimeoutTask;
+import org.onosproject.workflow.api.TimerChain;
+import org.onosproject.workflow.api.Worklet;
+import org.onosproject.workflow.api.Workflow;
+import org.onosproject.workflow.api.WorkflowContext;
+import org.onosproject.workflow.api.WorkflowData;
+import org.onosproject.workflow.api.ContextEventMapStore;
+import org.onosproject.workflow.api.WorkflowState;
+import org.onosproject.workflow.api.WorkflowStore;
+import org.onosproject.workflow.api.WorkflowBatchDelegate;
+import org.onosproject.workflow.api.WorkflowDataEvent;
+import org.onosproject.workflow.api.WorkflowDataListener;
+import org.onosproject.workflow.api.WorkflowException;
+import org.onosproject.workflow.api.HandlerTask;
+import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
+import org.onosproject.workflow.api.Workplace;
+import org.onosproject.workflow.api.WorkplaceStore;
+import org.onosproject.workflow.api.WorkplaceStoreDelegate;
+import org.onosproject.workflow.api.WorkflowExecutionService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Event;
+import org.onosproject.net.intent.WorkPartitionService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true, service = WorkflowExecutionService.class)
+public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
+ implements WorkflowExecutionService {
+
+ protected static final Logger log = getLogger(WorkFlowEngine.class);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected WorkPartitionService partitionService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected WorkplaceStore workplaceStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected WorkflowStore workflowStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ContextEventMapStore eventMapStore;
+
+ private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
+
+ private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
+ private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
+
+ private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
+ private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
+
+ private ExecutorService workflowBatchExecutor;
+ private ExecutorService workflowExecutor;
+
+ private ExecutorService handlerTaskBatchExecutor;
+ private ExecutorService handlerTaskExecutor;
+
+ private static final int DEFAULT_WORKFLOW_THREADS = 12;
+ private static final int DEFAULT_EVENTTASK_THREADS = 12;
+ private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
+
+ private ScheduledExecutorService eventMapTriggerExecutor;
+
+ private TimerChain timerChain = new TimerChain();
+
+ public static final String APPID = "org.onosproject.workflow";
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ public void activate() {
+ appId = coreService.registerApplication(APPID);
+ workplaceStore.setDelegate(workplaceStoreDelegate);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.runForLeadership(appId.name());
+
+ workflowBatchExecutor = newSingleThreadExecutor(
+ groupedThreads("onos/workflow", "workflow-batch", log));
+ workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
+ groupedThreads("onos/workflow-exec", "worker-%d", log));
+ handlerTaskBatchExecutor = newSingleThreadExecutor(
+ groupedThreads("onos/workflow", "handlertask-batch", log));
+ handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
+ groupedThreads("onos/handlertask-exec", "worker-%d", log));
+ eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
+
+ (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
+ JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
+ workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
+ new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ leadershipService.withdraw(appId.name());
+ workplaceStore.unsetDelegate(workplaceStoreDelegate);
+ workflowBatchExecutor.shutdown();
+ workflowExecutor.shutdown();
+ handlerTaskBatchExecutor.shutdown();
+ handlerTaskExecutor.shutdown();
+ eventMapTriggerExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public void execInitWorklet(WorkflowContext context) {
+
+ Workflow workflow = workflowStore.get(context.workflowId());
+ if (workflow == null) {
+ log.error("Invalid workflow {}", context.workflowId());
+ return;
+ }
+
+ initWorkletExecution(context);
+ try {
+ Worklet initWorklet = workflow.init(context);
+ if (initWorklet != null) {
+ initWorklet.process(context);
+ }
+
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ context.setCause(e.getMessage());
+ context.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(context.name(), context, false);
+ return;
+ }
+ // trigger the execution of next worklet.
+ workplaceStore.registerContext(context.name(), context);
+ }
+
+ @Override
+ public void eventMapTrigger(Event event, EventHintSupplier supplier) {
+
+ if (event.subject() instanceof SystemWorkflowContext) {
+ return;
+ }
+
+ Map<String, String> eventMap;
+
+ String eventHint;
+ try {
+ eventHint = supplier.apply(event);
+ } catch (Throwable e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ return;
+ }
+ if (eventHint == null) {
+ // do nothing
+ log.error("Invalid eventHint, event: {}", event);
+ return;
+ }
+
+ try {
+ eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ return;
+ }
+
+ if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
+ // do nothing;
+ log.debug("Invalid eventMap, event: {}", event);
+ return;
+ }
+
+ for (Map.Entry<String, String> entry : eventMap.entrySet()) {
+ String contextName = entry.getKey();
+ String workletType = entry.getValue();
+ WorkflowContext context = workplaceStore.getContext(contextName);
+ if (Objects.isNull(context)) {
+ log.info("Invalid context: {}, event: {}", contextName, event);
+ continue;
+ }
+ EventTask eventtask = EventTask.builder()
+ .event(event)
+ .eventHint(eventHint)
+ .context(context)
+ .workletType(workletType)
+ .build();
+
+ log.info("eventtaskAccumulator.add: task: {}", eventtask);
+ eventtaskAccumulator.add(eventtask);
+ }
+ }
+
+ @Override
+ public void registerEventMap(Class<? extends Event> eventType, String eventHint,
+ String contextName, String workletType) throws WorkflowException {
+ eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, workletType);
+ for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
+ Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
+ if (eventMap != null && eventMap.containsKey(contextName)) {
+ return;
+ }
+ try {
+ log.info("sleep {}", i);
+ Thread.sleep(10L * (i + 1));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ protected void post(WorkflowDataEvent event) {
+
+ if (event.subject() == null || !isRelevant(event.subject())) {
+ log.debug("ignore event {}", event);
+ return;
+ }
+
+ // trigger next worklet selection
+ WorkflowData dataModelContainer = event.subject();
+ switch (event.type()) {
+ case INSERT:
+ case UPDATE:
+ if (dataModelContainer.triggerNext()) {
+ log.info("workflowAccumulator.add: {}", dataModelContainer);
+ workflowAccumulator.add(dataModelContainer);
+ } else {
+ log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
+ }
+ break;
+ case REMOVE:
+ break;
+ default:
+ }
+
+ // trigger EventTask for WorkflowDataEvent
+ eventMapTriggerExecutor.submit(
+ () -> eventMapTrigger(
+ event,
+ // event hint supplier
+ (ev) -> {
+ if (ev == null || ev.subject() == null) {
+ return null;
+ }
+
+ if (ev.subject() instanceof WorkflowData) {
+ return ((WorkflowData) ev.subject()).name();
+ } else {
+ return null;
+ }
+ }
+ )
+ );
+ }
+
+ /**
+ * Checks whether this workflow data job is relevant to this ONOS node.
+ * @param job workflow data
+ * @return checking result
+ */
+ private boolean isRelevant(WorkflowData job) {
+ // distributes event processing with work-partition
+ return partitionService.isMine(job.distributor(), this::stringHash);
+ }
+
+ /**
+ * Gets hash of the string.
+ * @param str string to get a hash
+ * @return hash value
+ */
+ public Long stringHash(String str) {
+ return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
+ }
+
+ /**
+ * Class for handler task batch delegation.
+ */
+ private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
+ @Override
+ public void execute(Collection<Collection<HandlerTask>> operations) {
+ log.debug("Execute {} operation(s).", operations.size());
+
+ CompletableFuture.runAsync(() -> {
+ List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
+ .map(
+ x -> CompletableFuture.completedFuture(x)
+ .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
+ .exceptionally(e -> null)
+ )
+ .collect(Collectors.toList());
+
+ // waiting the completion of futures
+ futures.parallelStream().forEach(x -> x.join());
+
+ }, handlerTaskBatchExecutor).exceptionally(e -> {
+ log.error("Error submitting batches:", e);
+ return null;
+ }).thenRun(eventtaskAccumulator::ready);
+ }
+ }
+
+ /**
+ * Initializes worklet execution.
+ * @param context workflow context
+ */
+ private void initWorkletExecution(WorkflowContext context) {
+ context.setState(WorkflowState.RUNNING);
+ context.setCause("");
+ context.setWorkflowExecutionService(this);
+ context.setWorkflowStore(workflowStore);
+ context.setWorkplaceStore(workplaceStore);
+ context.waitCompletion(null, null, null, 0L);
+ context.setTriggerNext(false);
+ }
+
+ /**
+ * Processes handler tasks.
+ * @param tasks handler tasks
+ * @return handler tasks processed
+ */
+ private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
+
+ for (HandlerTask task : tasks) {
+ if (task instanceof EventTimeoutTask) {
+ execEventTimeoutTask((EventTimeoutTask) task);
+ } else if (task instanceof TimeoutTask) {
+ execTimeoutTask((TimeoutTask) task);
+ } else if (task instanceof EventTask) {
+ execEventTask((EventTask) task);
+ } else {
+ log.error("Unsupported handler task {}", task);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Executes event task.
+ * @param task event task
+ * @return event task
+ */
+ private EventTask execEventTask(EventTask task) {
+
+ Map<String, String> eventMap = null;
+ try {
+ eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ return task;
+ }
+
+ if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
+ return task;
+ }
+
+ if (Objects.isNull(eventMap.get(task.context().name()))) {
+ return task;
+ }
+
+ log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
+
+ WorkflowContext context = (WorkflowContext) (task.context());
+ Workflow workflow = workflowStore.get(context.workflowId());
+ if (workflow == null) {
+ log.error("Invalid workflow {}", context.workflowId());
+ return task;
+ }
+
+ WorkflowContext latestContext = workplaceStore.getContext(context.name());
+ if (latestContext == null) {
+ log.error("Invalid workflow context {}", context.name());
+ return task;
+ }
+
+ try {
+ Worklet worklet = workflow.getWorkletInstance(task.workletType());
+ if (!Objects.equals(latestContext.current(), worklet.tag())) {
+ log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
+ latestContext.current(), worklet.tag());
+ return task;
+ }
+
+ if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+ log.error("Current worklet is {}, Ignored", worklet);
+ return task;
+ }
+
+ initWorkletExecution(latestContext);
+
+ log.info("processHandlerTask.isCompleted-task:{}, latest:{}", task, latestContext);
+ if (worklet.isCompleted(latestContext, task.event())) {
+ eventMapStore.unregisterEventMap(
+ task.eventType(), task.eventHint(), latestContext.name());
+
+ workplaceStore.commitContext(latestContext.name(), latestContext, true);
+ return null;
+ } else {
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ }
+
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ }
+
+ return task;
+ }
+
+ /**
+ * Executes event timeout task.
+ * @param task event timeout task
+ * @return handler task
+ */
+ private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
+
+ Map<String, String> eventMap = null;
+ try {
+ eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
+ } catch (WorkflowException e) {
+ log.error("execEventTimeoutTask: Exception: {}, trace: {}",
+ e, Lists.newArrayList(e.getStackTrace()));
+ return task;
+ }
+
+ if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
+ return task;
+ }
+
+ if (Objects.isNull(eventMap.get(task.context().name()))) {
+ return task;
+ }
+
+ log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
+
+ WorkflowContext context = task.context();
+ Workflow workflow = workflowStore.get(context.workflowId());
+ if (workflow == null) {
+ log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
+ return task;
+ }
+
+ WorkflowContext latestContext = workplaceStore.getContext(context.name());
+ if (latestContext == null) {
+ log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
+ return task;
+ }
+
+ try {
+ Worklet worklet = workflow.getWorkletInstance(task.workletType());
+ if (!Objects.equals(latestContext.current(), worklet.tag())) {
+ log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
+ latestContext.current(), worklet.tag());
+ return task;
+ }
+
+ if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+ log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
+ return task;
+ }
+
+ initWorkletExecution(latestContext);
+
+ log.info("execEventTimeoutTask.timeout-task:{}, latest:{}", task, latestContext);
+ eventMapStore.unregisterEventMap(
+ task.eventType(), task.eventHint(), latestContext.name());
+
+ worklet.timeout(latestContext);
+ workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
+
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ }
+
+ return task;
+ }
+
+ /**
+ * Executes timeout task.
+ * @param task time out task
+ * @return handler task
+ */
+ private HandlerTask execTimeoutTask(TimeoutTask task) {
+
+ log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
+
+ WorkflowContext context = (WorkflowContext) (task.context());
+ Workflow workflow = workflowStore.get(context.workflowId());
+ if (workflow == null) {
+ log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
+ return task;
+ }
+
+ WorkflowContext latestContext = workplaceStore.getContext(context.name());
+ if (latestContext == null) {
+ log.error("execTimeoutTask: Invalid workflow context {}", context.name());
+ return task;
+ }
+
+ try {
+ Worklet worklet = workflow.getWorkletInstance(task.workletType());
+ if (!Objects.equals(latestContext.current(), worklet.tag())) {
+ log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
+ latestContext.current(), worklet.tag());
+ return task;
+ }
+
+ if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+ log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
+ return task;
+ }
+
+ initWorkletExecution(latestContext);
+
+ worklet.timeout(latestContext);
+ workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
+
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ }
+
+ return task;
+ }
+
+ /**
+ * Class for delegation of workflow batch execution.
+ */
+ private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
+ @Override
+ public void execute(Collection<WorkflowData> operations) {
+ log.debug("Execute {} operation(s).", operations.size());
+
+ CompletableFuture.runAsync(() -> {
+ List<CompletableFuture<WorkflowData>> futures = operations.stream()
+ .map(
+ x -> CompletableFuture.completedFuture(x)
+ .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
+ .exceptionally(e -> null)
+ )
+ .collect(Collectors.toList());
+
+ // waiting the completion of futures
+ futures.parallelStream().forEach(x -> x.join());
+
+ }, workflowBatchExecutor).exceptionally(e -> {
+ log.error("Error submitting batches:", e);
+ return null;
+ }).thenRun(workflowAccumulator::ready);
+ }
+ }
+
+ /**
+ * Executes workflow.
+ * @param dataModelContainer workflow data model container(workflow or workplace)
+ * @return
+ */
+ private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
+ if (dataModelContainer instanceof WorkflowContext) {
+ return execWorkflowContext((WorkflowContext) dataModelContainer);
+ } else if (dataModelContainer instanceof Workplace) {
+ return execWorkplace((Workplace) dataModelContainer);
+ } else {
+ log.error("Invalid context {}", dataModelContainer);
+ return null;
+ }
+ }
+
+ /**
+ * Executes workflow context.
+ * @param context workflow context
+ * @return workflow context
+ */
+ private WorkflowContext execWorkflowContext(WorkflowContext context) {
+
+ Workflow workflow = workflowStore.get(context.workflowId());
+ if (workflow == null) {
+ log.error("Invalid workflow {}", context.workflowId());
+ return null;
+ }
+
+ final WorkflowContext latestContext = workplaceStore.getContext(context.name());
+ if (latestContext == null) {
+ log.error("Invalid workflow context {}", context.name());
+ return null;
+ }
+
+ initWorkletExecution(latestContext);
+
+ try {
+ final Worklet worklet = workflow.next(latestContext);
+
+ if (worklet == Worklet.Common.INIT) {
+ log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
+ return latestContext;
+ }
+
+ latestContext.setCurrent(worklet);
+ if (worklet == Worklet.Common.COMPLETED) {
+
+ if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
+ workplaceStore.removeContext(latestContext.name());
+ return null;
+ } else {
+ latestContext.setState(WorkflowState.IDLE);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ return latestContext;
+ }
+ }
+
+ log.info("execWorkflowContext.process:{}, {}", worklet.tag(), latestContext);
+ worklet.process(latestContext);
+
+ if (latestContext.completionEventType() != null) {
+ if (latestContext.completionEventGenerator() == null) {
+ String msg = String.format("Invalid exepecting event(%s), generator(%s)",
+ latestContext.completionEventType(),
+ latestContext.completionEventGenerator());
+ throw new WorkflowException(msg);
+ }
+
+ registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
+ latestContext.name(), worklet.tag());
+
+ latestContext.completionEventGenerator().apply();
+
+ if (latestContext.completionEventTimeout() != 0L) {
+ final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
+ .context(latestContext)
+ .workletType(worklet.tag())
+ .eventType(latestContext.completionEventType().getName())
+ .eventHint(latestContext.completionEventHint())
+ .build();
+ timerChain.schedule(latestContext.completionEventTimeout(),
+ () -> {
+ eventtaskAccumulator.add(eventTimeoutTask);
+ });
+ }
+ } else {
+ if (latestContext.completionEventTimeout() != 0L) {
+ final TimeoutTask timeoutTask = TimeoutTask.builder()
+ .context(latestContext)
+ .workletType(worklet.tag())
+ .build();
+
+ timerChain.schedule(latestContext.completionEventTimeout(),
+ () -> {
+ eventtaskAccumulator.add(timeoutTask);
+ });
+ }
+ }
+
+ workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
+
+ } catch (WorkflowException e) {
+ log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ }
+
+ return latestContext;
+ }
+
+ /**
+ * Execute workplace.
+ * @param workplace workplace
+ * @return workplace
+ */
+ private Workplace execWorkplace(Workplace workplace) {
+
+ return null;
+ }
+
+}
\ No newline at end of file