Merge remote-tracking branch 'origin/master' into merge-master

Change-Id: I4608093c4400a313b253508ac6bc8a84ecba5c7e
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
new file mode 100644
index 0000000..15cd8cd
--- /dev/null
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
@@ -0,0 +1,738 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.workflow.impl;
+
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.google.common.collect.Lists;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.workflow.api.DefaultWorkplace;
+import org.onosproject.workflow.api.EventHintSupplier;
+import org.onosproject.workflow.api.EventTask;
+import org.onosproject.workflow.api.JsonDataModelTree;
+import org.onosproject.workflow.api.SystemWorkflowContext;
+import org.onosproject.workflow.api.EventTimeoutTask;
+import org.onosproject.workflow.api.TimeoutTask;
+import org.onosproject.workflow.api.TimerChain;
+import org.onosproject.workflow.api.Worklet;
+import org.onosproject.workflow.api.Workflow;
+import org.onosproject.workflow.api.WorkflowContext;
+import org.onosproject.workflow.api.WorkflowData;
+import org.onosproject.workflow.api.ContextEventMapStore;
+import org.onosproject.workflow.api.WorkflowState;
+import org.onosproject.workflow.api.WorkflowStore;
+import org.onosproject.workflow.api.WorkflowBatchDelegate;
+import org.onosproject.workflow.api.WorkflowDataEvent;
+import org.onosproject.workflow.api.WorkflowDataListener;
+import org.onosproject.workflow.api.WorkflowException;
+import org.onosproject.workflow.api.HandlerTask;
+import org.onosproject.workflow.api.HandlerTaskBatchDelegate;
+import org.onosproject.workflow.api.Workplace;
+import org.onosproject.workflow.api.WorkplaceStore;
+import org.onosproject.workflow.api.WorkplaceStoreDelegate;
+import org.onosproject.workflow.api.WorkflowExecutionService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Event;
+import org.onosproject.net.intent.WorkPartitionService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.workflow.api.WorkflowAttribute.REMOVE_AFTER_COMPLETE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true, service = WorkflowExecutionService.class)
+public class WorkFlowEngine extends AbstractListenerManager<WorkflowDataEvent, WorkflowDataListener>
+        implements WorkflowExecutionService {
+
+    protected static final Logger log = getLogger(WorkFlowEngine.class);
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected WorkPartitionService partitionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected WorkplaceStore workplaceStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected WorkflowStore workflowStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ContextEventMapStore eventMapStore;
+
+    private final WorkplaceStoreDelegate workplaceStoreDelegate = this::post;
+
+    private final WorkflowBatchDelegate workflowBatchDelegate = new InternalWorkflowBatchDelegate();
+    private final WorkflowAccumulator workflowAccumulator = new WorkflowAccumulator(workflowBatchDelegate);
+
+    private final HandlerTaskBatchDelegate eventtaskBatchDelegate = new InternalHandlerTaskBatchDelegate();
+    private final HandlerTaskAccumulator eventtaskAccumulator = new HandlerTaskAccumulator(eventtaskBatchDelegate);
+
+    private ExecutorService workflowBatchExecutor;
+    private ExecutorService workflowExecutor;
+
+    private ExecutorService handlerTaskBatchExecutor;
+    private ExecutorService handlerTaskExecutor;
+
+    private static final int DEFAULT_WORKFLOW_THREADS = 12;
+    private static final int DEFAULT_EVENTTASK_THREADS = 12;
+    private static final int MAX_REGISTER_EVENTMAP_WAITS = 10;
+
+    private ScheduledExecutorService eventMapTriggerExecutor;
+
+    private TimerChain timerChain = new TimerChain();
+
+    public static final String APPID = "org.onosproject.workflow";
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    public void activate() {
+        appId = coreService.registerApplication(APPID);
+        workplaceStore.setDelegate(workplaceStoreDelegate);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+
+        workflowBatchExecutor = newSingleThreadExecutor(
+                groupedThreads("onos/workflow", "workflow-batch", log));
+        workflowExecutor = newFixedThreadPool(DEFAULT_WORKFLOW_THREADS,
+                groupedThreads("onos/workflow-exec", "worker-%d", log));
+        handlerTaskBatchExecutor = newSingleThreadExecutor(
+                groupedThreads("onos/workflow", "handlertask-batch", log));
+        handlerTaskExecutor = newFixedThreadPool(DEFAULT_EVENTTASK_THREADS,
+                groupedThreads("onos/handlertask-exec", "worker-%d", log));
+        eventMapTriggerExecutor = newSingleThreadScheduledExecutor(
+                groupedThreads("onos/workflow-engine", "eventmap-trigger-executor"));
+
+        (new WorkplaceWorkflow(this, workflowStore)).registerWorkflows();
+        JsonDataModelTree data = new JsonDataModelTree(JsonNodeFactory.instance.objectNode());
+        workplaceStore.registerWorkplace(Workplace.SYSTEM_WORKPLACE,
+                new DefaultWorkplace(Workplace.SYSTEM_WORKPLACE, data));
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        leadershipService.withdraw(appId.name());
+        workplaceStore.unsetDelegate(workplaceStoreDelegate);
+        workflowBatchExecutor.shutdown();
+        workflowExecutor.shutdown();
+        handlerTaskBatchExecutor.shutdown();
+        handlerTaskExecutor.shutdown();
+        eventMapTriggerExecutor.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public void execInitWorklet(WorkflowContext context) {
+
+        Workflow workflow = workflowStore.get(context.workflowId());
+        if (workflow == null) {
+            log.error("Invalid workflow {}", context.workflowId());
+            return;
+        }
+
+        initWorkletExecution(context);
+        try {
+            Worklet initWorklet = workflow.init(context);
+            if (initWorklet != null) {
+                initWorklet.process(context);
+            }
+
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            context.setCause(e.getMessage());
+            context.setState(WorkflowState.EXCEPTION);
+            workplaceStore.commitContext(context.name(), context, false);
+            return;
+        }
+        // trigger the execution of next worklet.
+        workplaceStore.registerContext(context.name(), context);
+    }
+
+    @Override
+    public void eventMapTrigger(Event event, EventHintSupplier supplier) {
+
+        if (event.subject() instanceof SystemWorkflowContext) {
+            return;
+        }
+
+        Map<String, String> eventMap;
+
+        String eventHint;
+        try {
+            eventHint = supplier.apply(event);
+        } catch (Throwable e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            return;
+        }
+        if (eventHint == null) {
+            // do nothing
+            log.error("Invalid eventHint, event: {}", event);
+            return;
+        }
+
+        try {
+            eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            return;
+        }
+
+        if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
+            // do nothing;
+            log.debug("Invalid eventMap, event: {}", event);
+            return;
+        }
+
+        for (Map.Entry<String, String> entry : eventMap.entrySet()) {
+            String contextName = entry.getKey();
+            String workletType = entry.getValue();
+            WorkflowContext context = workplaceStore.getContext(contextName);
+            if (Objects.isNull(context)) {
+                log.info("Invalid context: {}, event: {}", contextName, event);
+                continue;
+            }
+            EventTask eventtask = EventTask.builder()
+                    .event(event)
+                    .eventHint(eventHint)
+                    .context(context)
+                    .workletType(workletType)
+                    .build();
+
+            log.info("eventtaskAccumulator.add: task: {}", eventtask);
+            eventtaskAccumulator.add(eventtask);
+        }
+    }
+
+    @Override
+    public void registerEventMap(Class<? extends Event> eventType, String eventHint,
+                                 String contextName, String workletType) throws WorkflowException {
+        eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, workletType);
+        for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
+            Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
+            if (eventMap != null && eventMap.containsKey(contextName)) {
+                return;
+            }
+            try {
+                log.info("sleep {}", i);
+                Thread.sleep(10L * (i + 1));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    protected void post(WorkflowDataEvent event) {
+
+        if (event.subject() == null || !isRelevant(event.subject())) {
+            log.debug("ignore event {}", event);
+            return;
+        }
+
+        // trigger next worklet selection
+        WorkflowData dataModelContainer = event.subject();
+        switch (event.type()) {
+            case INSERT:
+            case UPDATE:
+                if (dataModelContainer.triggerNext()) {
+                    log.info("workflowAccumulator.add: {}", dataModelContainer);
+                    workflowAccumulator.add(dataModelContainer);
+                } else {
+                    log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
+                }
+                break;
+            case REMOVE:
+                break;
+            default:
+        }
+
+        // trigger EventTask for WorkflowDataEvent
+        eventMapTriggerExecutor.submit(
+                () -> eventMapTrigger(
+                        event,
+                        // event hint supplier
+                        (ev) -> {
+                            if (ev == null || ev.subject() == null) {
+                                return null;
+                            }
+
+                            if (ev.subject() instanceof WorkflowData) {
+                                return ((WorkflowData) ev.subject()).name();
+                            } else {
+                                return null;
+                            }
+                        }
+                )
+        );
+    }
+
+    /**
+     * Checks whether this workflow data job is relevant to this ONOS node.
+     * @param job workflow data
+     * @return checking result
+     */
+    private boolean isRelevant(WorkflowData job) {
+        // distributes event processing with work-partition
+        return partitionService.isMine(job.distributor(), this::stringHash);
+    }
+
+    /**
+     * Gets hash of the string.
+     * @param str string to get a hash
+     * @return hash value
+     */
+    public Long stringHash(String str) {
+        return UUID.nameUUIDFromBytes(str.getBytes()).getMostSignificantBits();
+    }
+
+    /**
+     * Class for handler task batch delegation.
+     */
+    private class InternalHandlerTaskBatchDelegate implements HandlerTaskBatchDelegate {
+        @Override
+        public void execute(Collection<Collection<HandlerTask>> operations) {
+            log.debug("Execute {} operation(s).", operations.size());
+
+            CompletableFuture.runAsync(() -> {
+                List<CompletableFuture<Collection<HandlerTask>>> futures = operations.stream()
+                        .map(
+                                x -> CompletableFuture.completedFuture(x)
+                                        .thenApplyAsync(WorkFlowEngine.this::processHandlerTask, handlerTaskExecutor)
+                                        .exceptionally(e -> null)
+                        )
+                        .collect(Collectors.toList());
+
+                // waiting the completion of futures
+                futures.parallelStream().forEach(x -> x.join());
+
+            }, handlerTaskBatchExecutor).exceptionally(e -> {
+                log.error("Error submitting batches:", e);
+                return null;
+            }).thenRun(eventtaskAccumulator::ready);
+        }
+    }
+
+    /**
+     * Initializes worklet execution.
+     * @param context workflow context
+     */
+    private void initWorkletExecution(WorkflowContext context) {
+        context.setState(WorkflowState.RUNNING);
+        context.setCause("");
+        context.setWorkflowExecutionService(this);
+        context.setWorkflowStore(workflowStore);
+        context.setWorkplaceStore(workplaceStore);
+        context.waitCompletion(null, null, null, 0L);
+        context.setTriggerNext(false);
+    }
+
+    /**
+     * Processes handler tasks.
+     * @param tasks handler tasks
+     * @return handler tasks processed
+     */
+    private Collection<HandlerTask> processHandlerTask(Collection<HandlerTask> tasks) {
+
+        for (HandlerTask task : tasks) {
+            if (task instanceof EventTimeoutTask) {
+                execEventTimeoutTask((EventTimeoutTask) task);
+            } else if (task instanceof TimeoutTask) {
+                execTimeoutTask((TimeoutTask) task);
+            } else if (task instanceof EventTask) {
+                execEventTask((EventTask) task);
+            } else {
+                log.error("Unsupported handler task {}", task);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Executes event task.
+     * @param task event task
+     * @return event task
+     */
+    private EventTask execEventTask(EventTask task) {
+
+        Map<String, String> eventMap = null;
+        try {
+            eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            return task;
+        }
+
+        if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
+            return task;
+        }
+
+        if (Objects.isNull(eventMap.get(task.context().name()))) {
+            return task;
+        }
+
+        log.debug("execEventTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
+
+        WorkflowContext context = (WorkflowContext) (task.context());
+        Workflow workflow = workflowStore.get(context.workflowId());
+        if (workflow == null) {
+            log.error("Invalid workflow {}", context.workflowId());
+            return task;
+        }
+
+        WorkflowContext latestContext = workplaceStore.getContext(context.name());
+        if (latestContext == null) {
+            log.error("Invalid workflow context {}", context.name());
+            return task;
+        }
+
+        try {
+            Worklet worklet = workflow.getWorkletInstance(task.workletType());
+            if (!Objects.equals(latestContext.current(), worklet.tag())) {
+                log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
+                        latestContext.current(), worklet.tag());
+                return task;
+            }
+
+            if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+                log.error("Current worklet is {}, Ignored", worklet);
+                return task;
+            }
+
+            initWorkletExecution(latestContext);
+
+            log.info("processHandlerTask.isCompleted-task:{}, latest:{}", task, latestContext);
+            if (worklet.isCompleted(latestContext, task.event())) {
+                eventMapStore.unregisterEventMap(
+                        task.eventType(), task.eventHint(), latestContext.name());
+
+                workplaceStore.commitContext(latestContext.name(), latestContext, true);
+                return null;
+            } else {
+                workplaceStore.commitContext(latestContext.name(), latestContext, false);
+            }
+
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            latestContext.setCause(e.getMessage());
+            latestContext.setState(WorkflowState.EXCEPTION);
+            workplaceStore.commitContext(latestContext.name(), latestContext, false);
+        }
+
+        return task;
+    }
+
+    /**
+     * Executes event timeout task.
+     * @param task event timeout task
+     * @return handler task
+     */
+    private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
+
+        Map<String, String> eventMap = null;
+        try {
+            eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
+        } catch (WorkflowException e) {
+            log.error("execEventTimeoutTask: Exception: {}, trace: {}",
+                    e, Lists.newArrayList(e.getStackTrace()));
+            return task;
+        }
+
+        if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
+            return task;
+        }
+
+        if (Objects.isNull(eventMap.get(task.context().name()))) {
+            return task;
+        }
+
+        log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
+
+        WorkflowContext context = task.context();
+        Workflow workflow = workflowStore.get(context.workflowId());
+        if (workflow == null) {
+            log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
+            return task;
+        }
+
+        WorkflowContext latestContext = workplaceStore.getContext(context.name());
+        if (latestContext == null) {
+            log.error("execEventTimeoutTask: Invalid workflow context {}", context.name());
+            return task;
+        }
+
+        try {
+            Worklet worklet = workflow.getWorkletInstance(task.workletType());
+            if (!Objects.equals(latestContext.current(), worklet.tag())) {
+                log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
+                        latestContext.current(), worklet.tag());
+                return task;
+            }
+
+            if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+                log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
+                return task;
+            }
+
+            initWorkletExecution(latestContext);
+
+            log.info("execEventTimeoutTask.timeout-task:{}, latest:{}", task, latestContext);
+            eventMapStore.unregisterEventMap(
+                    task.eventType(), task.eventHint(), latestContext.name());
+
+            worklet.timeout(latestContext);
+            workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
+
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            latestContext.setCause(e.getMessage());
+            latestContext.setState(WorkflowState.EXCEPTION);
+            workplaceStore.commitContext(latestContext.name(), latestContext, false);
+        }
+
+        return task;
+    }
+
+    /**
+     * Executes timeout task.
+     * @param task time out task
+     * @return handler task
+     */
+    private HandlerTask execTimeoutTask(TimeoutTask task) {
+
+        log.debug("execTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
+
+        WorkflowContext context = (WorkflowContext) (task.context());
+        Workflow workflow = workflowStore.get(context.workflowId());
+        if (workflow == null) {
+            log.error("execTimeoutTask: Invalid workflow {}", context.workflowId());
+            return task;
+        }
+
+        WorkflowContext latestContext = workplaceStore.getContext(context.name());
+        if (latestContext == null) {
+            log.error("execTimeoutTask: Invalid workflow context {}", context.name());
+            return task;
+        }
+
+        try {
+            Worklet worklet = workflow.getWorkletInstance(task.workletType());
+            if (!Objects.equals(latestContext.current(), worklet.tag())) {
+                log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
+                        latestContext.current(), worklet.tag());
+                return task;
+            }
+
+            if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+                log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
+                return task;
+            }
+
+            initWorkletExecution(latestContext);
+
+            worklet.timeout(latestContext);
+            workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
+
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            latestContext.setCause(e.getMessage());
+            latestContext.setState(WorkflowState.EXCEPTION);
+            workplaceStore.commitContext(latestContext.name(), latestContext, false);
+        }
+
+        return task;
+    }
+
+    /**
+     * Class for delegation of workflow batch execution.
+     */
+    private class InternalWorkflowBatchDelegate implements WorkflowBatchDelegate {
+        @Override
+        public void execute(Collection<WorkflowData> operations) {
+            log.debug("Execute {} operation(s).", operations.size());
+
+            CompletableFuture.runAsync(() -> {
+                List<CompletableFuture<WorkflowData>> futures = operations.stream()
+                        .map(
+                                x -> CompletableFuture.completedFuture(x)
+                                .thenApplyAsync(WorkFlowEngine.this::execWorkflow, workflowExecutor)
+                                .exceptionally(e -> null)
+                        )
+                        .collect(Collectors.toList());
+
+                // waiting the completion of futures
+                futures.parallelStream().forEach(x -> x.join());
+
+            }, workflowBatchExecutor).exceptionally(e -> {
+                log.error("Error submitting batches:", e);
+                return null;
+            }).thenRun(workflowAccumulator::ready);
+        }
+    }
+
+    /**
+     * Executes workflow.
+     * @param dataModelContainer workflow data model container(workflow or workplace)
+     * @return
+     */
+    private WorkflowData execWorkflow(WorkflowData dataModelContainer) {
+        if (dataModelContainer instanceof WorkflowContext) {
+            return execWorkflowContext((WorkflowContext) dataModelContainer);
+        } else if (dataModelContainer instanceof Workplace) {
+            return execWorkplace((Workplace) dataModelContainer);
+        } else {
+            log.error("Invalid context {}", dataModelContainer);
+            return null;
+        }
+    }
+
+    /**
+     * Executes workflow context.
+     * @param context workflow context
+     * @return workflow context
+     */
+    private WorkflowContext execWorkflowContext(WorkflowContext context) {
+
+        Workflow workflow = workflowStore.get(context.workflowId());
+        if (workflow == null) {
+            log.error("Invalid workflow {}", context.workflowId());
+            return null;
+        }
+
+        final WorkflowContext latestContext = workplaceStore.getContext(context.name());
+        if (latestContext == null) {
+            log.error("Invalid workflow context {}", context.name());
+            return null;
+        }
+
+        initWorkletExecution(latestContext);
+
+        try {
+            final Worklet worklet = workflow.next(latestContext);
+
+            if (worklet == Worklet.Common.INIT) {
+                log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
+                return latestContext;
+            }
+
+            latestContext.setCurrent(worklet);
+            if (worklet == Worklet.Common.COMPLETED) {
+
+                if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
+                    workplaceStore.removeContext(latestContext.name());
+                    return null;
+                } else {
+                    latestContext.setState(WorkflowState.IDLE);
+                    workplaceStore.commitContext(latestContext.name(), latestContext, false);
+                    return latestContext;
+                }
+            }
+
+            log.info("execWorkflowContext.process:{}, {}", worklet.tag(), latestContext);
+            worklet.process(latestContext);
+
+            if (latestContext.completionEventType() != null) {
+                if (latestContext.completionEventGenerator() == null) {
+                    String msg = String.format("Invalid exepecting event(%s), generator(%s)",
+                            latestContext.completionEventType(),
+                            latestContext.completionEventGenerator());
+                    throw new WorkflowException(msg);
+                }
+
+                registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
+                        latestContext.name(), worklet.tag());
+
+                latestContext.completionEventGenerator().apply();
+
+                if (latestContext.completionEventTimeout() != 0L) {
+                    final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
+                            .context(latestContext)
+                            .workletType(worklet.tag())
+                            .eventType(latestContext.completionEventType().getName())
+                            .eventHint(latestContext.completionEventHint())
+                            .build();
+                    timerChain.schedule(latestContext.completionEventTimeout(),
+                            () -> {
+                                eventtaskAccumulator.add(eventTimeoutTask);
+                            });
+                }
+            } else {
+                if (latestContext.completionEventTimeout() != 0L) {
+                    final TimeoutTask timeoutTask = TimeoutTask.builder()
+                            .context(latestContext)
+                            .workletType(worklet.tag())
+                            .build();
+
+                    timerChain.schedule(latestContext.completionEventTimeout(),
+                            () -> {
+                                eventtaskAccumulator.add(timeoutTask);
+                            });
+                }
+            }
+
+            workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
+
+        } catch (WorkflowException e) {
+            log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+            latestContext.setCause(e.getMessage());
+            latestContext.setState(WorkflowState.EXCEPTION);
+            workplaceStore.commitContext(latestContext.name(), latestContext, false);
+        }
+
+        return latestContext;
+    }
+
+    /**
+     * Execute workplace.
+     * @param workplace workplace
+     * @return workplace
+     */
+    private Workplace execWorkplace(Workplace workplace) {
+
+        return null;
+    }
+
+}
\ No newline at end of file