/*
 * Copyright 2018-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.workflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.event.Event;
import org.onosproject.workflow.api.AbstractWorklet;
import org.onosproject.workflow.api.DefaultWorkflowContext;
import org.onosproject.workflow.api.DefaultWorkplace;
import org.onosproject.workflow.api.ImmutableListWorkflow;
import org.onosproject.workflow.api.JsonDataModelTree;
import org.onosproject.workflow.api.SystemWorkflowContext;
import org.onosproject.workflow.api.Workflow;
import org.onosproject.workflow.api.WorkflowAttribute;
import org.onosproject.workflow.api.WorkflowContext;
import org.onosproject.workflow.api.WorkflowData;
import org.onosproject.workflow.api.WorkflowDataEvent;
import org.onosproject.workflow.api.DefaultWorkflowDescription;
import org.onosproject.workflow.api.WorkflowDescription;
import org.onosproject.workflow.api.WorkflowException;
import org.onosproject.workflow.api.WorkflowExecutionService;
import org.onosproject.workflow.api.WorkflowStore;
import org.onosproject.workflow.api.Workplace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.Objects;

import static org.onosproject.workflow.api.WorkflowDataEvent.Type.INSERT;

public class WorkplaceWorkflow {

    private static final Logger log = LoggerFactory.getLogger(WorkplaceWorkflow.class);

    private WorkflowExecutionService workflowExecutionService;
    private WorkflowStore workflowStore;

    public WorkplaceWorkflow(WorkflowExecutionService workflowExecutionService,
                             WorkflowStore workflowStore) {
        this.workflowExecutionService = workflowExecutionService;
        this.workflowStore = workflowStore;
    }

    public static final String WF_CREATE_WORKFLOW = "workplace.create-workflow";

    public void registerWorkflows() {

        Workflow workflow = ImmutableListWorkflow.builder()
                .id(URI.create(WF_CREATE_WORKFLOW))
                .attribute(WorkflowAttribute.REMOVE_AFTER_COMPLETE)
                .init(ChangeDistributor.class.getName())
                .chain(CreateWorkplace.class.getName())
                .chain(CreateWorkflowContext.class.getName())
                .build();
        workflowStore.register(workflow);
    }

    public abstract static class AbsWorkflowWorklet extends AbstractWorklet {

        protected WorkflowDescription getWorkflowDesc(WorkflowContext context) throws WorkflowException {

            JsonNode root = ((JsonDataModelTree) context.data()).root();
            return DefaultWorkflowDescription.valueOf(root);
        }
    }

    public static class ChangeDistributor extends AbsWorkflowWorklet {

        @Override
        public void process(WorkflowContext context) throws WorkflowException {

            String workplaceName = getWorkflowDesc(context).workplaceName();
            // Sets workflow job distribution hash value to make this workflow to be executed on the
            // same cluster node to execute workplace tasks.
            ((SystemWorkflowContext) context).setDistributor(workplaceName);

            context.completed();
       }
    }

    public static class CreateWorkplace extends AbsWorkflowWorklet {

        @Override
        public boolean isNext(WorkflowContext context) throws WorkflowException {

            WorkflowDescription wfDesc = getWorkflowDesc(context);

            Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
            return Objects.isNull(workplace);
        }

        @Override
        public void process(WorkflowContext context) throws WorkflowException {

            WorkflowDescription wfDesc = getWorkflowDesc(context);

            // creates workplace with empty data model
            DefaultWorkplace workplace = new DefaultWorkplace(wfDesc.workplaceName(), new JsonDataModelTree());
            log.info("registerWorkplace {}", workplace);
            context.waitCompletion(WorkflowDataEvent.class, wfDesc.workplaceName(),
                    () -> context.workplaceStore().registerWorkplace(wfDesc.workplaceName(), workplace),
                    60000L
            );
        }

        @Override
        public boolean isCompleted(WorkflowContext context, Event event)throws WorkflowException {

            if (!(event instanceof WorkflowDataEvent)) {
                return false;
            }

            WorkflowDataEvent wfEvent = (WorkflowDataEvent) event;
            WorkflowData wfData = wfEvent.subject();

            WorkflowDescription wfDesc = getWorkflowDesc(context);

            if (wfData instanceof Workplace
                    && Objects.equals(wfData.name(), wfDesc.workplaceName())
                    && wfEvent.type() == INSERT) {
                log.info("isCompleted(true): event:{}, context:{}, workplace:{}",
                        event, context, wfDesc.workplaceName());
                return true;
            } else {
                log.info("isCompleted(false) event:{}, context:{}, workplace:{}",
                        event, context, wfDesc.workplaceName());
                return false;
            }
        }
    }

    public static class CreateWorkflowContext extends AbsWorkflowWorklet {

        private static final String SUBMITTED = "submitted";

        private boolean isSubmitted(WorkflowContext context) throws WorkflowException {
            JsonNode node = ((JsonDataModelTree) context.data()).nodeAt("/" + SUBMITTED);
            if (!(node instanceof BooleanNode)) {
                return false;
            }
            return node.asBoolean();
        }

        private void submitTrue(WorkflowContext context) throws WorkflowException {
            JsonNode root = ((JsonDataModelTree) context.data()).root();
            if (!(root instanceof ObjectNode)) {
                throw new WorkflowException("Invalid root node for " + context);
            }
            ((ObjectNode) root).put(SUBMITTED, true);
        }

        @Override
        public boolean isNext(WorkflowContext context) throws WorkflowException {

            WorkflowDescription wfDesc = getWorkflowDesc(context);

            String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
            if (Objects.isNull(context.workplaceStore().getContext(contextName))) {
                return (!isSubmitted(context));
            } else {
                return false;
            }
        }

        @Override
        public void process(WorkflowContext context) throws WorkflowException {

            WorkflowDescription wfDesc = getWorkflowDesc(context);

            Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
            if (Objects.isNull(workplace)) {

                log.error("Failed to find workplace with " + wfDesc.workplaceName());
                throw new WorkflowException("Failed to find workplace with " + wfDesc.workplaceName());
            }

            Workflow workflow = context.workflowStore().get(wfDesc.id());
            if (Objects.isNull(workflow)) {
                throw new WorkflowException("Failed to find workflow with " + wfDesc.id());
            }

            //String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
            String contextName = wfDesc.workflowContextName();
            if (Objects.nonNull(context.workplaceStore().getContext(contextName))) {
                throw new WorkflowException(contextName + " exists already");
            }

            JsonDataModelTree subTree = new JsonDataModelTree(wfDesc.data());
            WorkflowContext buildingContext = workflow.buildContext(workplace, subTree);
            log.info("registerContext {}", buildingContext.name());
            context.workplaceStore().registerContext(buildingContext.name(), buildingContext);
            submitTrue(context);

            context.completed();
        }
    }
}
