/*
 * Copyright 2018-present Open Networking Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.onosproject.workflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.BaseJsonNode;
import com.fasterxml.jackson.databind.node.BigIntegerNode;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.DecimalNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.FloatNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.JsonNodeCreator;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.MissingNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.NumericNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.POJONode;
import com.fasterxml.jackson.databind.node.ShortNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.node.ValueNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.workflow.api.DataModelTree;
import org.onosproject.workflow.api.DefaultWorkplace;
import org.onosproject.workflow.api.DefaultWorkflowContext;
import org.onosproject.workflow.api.JsonDataModelTree;
import org.onosproject.workflow.api.SystemWorkflowContext;
import org.onosproject.workflow.api.WorkflowContext;
import org.onosproject.workflow.api.WorkflowData;
import org.onosproject.workflow.api.WorkflowState;
import org.onosproject.workflow.api.Workplace;
import org.onosproject.workflow.api.WorkflowDataEvent;
import org.onosproject.workflow.api.WorkplaceStore;
import org.onosproject.workflow.api.WorkplaceStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.slf4j.LoggerFactory.getLogger;

@Component(immediate = true)
@Service
public class DistributedWorkplaceStore
    extends AbstractStore<WorkflowDataEvent, WorkplaceStoreDelegate> implements WorkplaceStore {

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected CoreService coreService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StorageService storageService;

    private ApplicationId appId;
    private final Logger log = getLogger(getClass());

    private final WorkplaceMapListener workplaceMapEventListener = new WorkplaceMapListener();
    private ConsistentMap<String, WorkflowData> workplaceMap;
    private Map<String, Workplace> localWorkplaceMap = Maps.newConcurrentMap();

    private final WorkflowContextMapListener contextMapEventListener = new WorkflowContextMapListener();
    private ConsistentMap<String, WorkflowData> contextMap;
    private Map<String, WorkflowContext> localContextMap = Maps.newConcurrentMap();

    private Map<String, Map<String, WorkflowContext>> localWorkplaceMemberMap = Maps.newConcurrentMap();

    @Activate
    public void activate() {

        appId = coreService.registerApplication("org.onosproject.workplacestore");
        log.info("appId=" + appId);

        KryoNamespace workplaceNamespace = KryoNamespace.newBuilder()
                .register(KryoNamespaces.API)
                .register(WorkflowData.class)
                .register(Workplace.class)
                .register(DefaultWorkplace.class)
                .register(WorkflowContext.class)
                .register(DefaultWorkflowContext.class)
                .register(SystemWorkflowContext.class)
                .register(WorkflowState.class)
                .register(DataModelTree.class)
                .register(JsonDataModelTree.class)
                .register(List.class)
                .register(ArrayList.class)
                .register(JsonNode.class)
                .register(ObjectNode.class)
                .register(TextNode.class)
                .register(LinkedHashMap.class)
                .register(ArrayNode.class)
                .register(BaseJsonNode.class)
                .register(BigIntegerNode.class)
                .register(BinaryNode.class)
                .register(BooleanNode.class)
                .register(ContainerNode.class)
                .register(DecimalNode.class)
                .register(DoubleNode.class)
                .register(FloatNode.class)
                .register(IntNode.class)
                .register(JsonNodeType.class)
                .register(LongNode.class)
                .register(MissingNode.class)
                .register(NullNode.class)
                .register(NumericNode.class)
                .register(POJONode.class)
                .register(ShortNode.class)
                .register(ValueNode.class)
                .register(JsonNodeCreator.class)
                .register(JsonNodeFactory.class)
                .build();

        localWorkplaceMap.clear();
        workplaceMap = storageService.<String, WorkflowData>consistentMapBuilder()
                .withSerializer(Serializer.using(workplaceNamespace))
                .withName("workplace-map")
                .withApplicationId(appId)
                .build();
        workplaceMap.addListener(workplaceMapEventListener);

        localContextMap.clear();
        contextMap = storageService.<String, WorkflowData>consistentMapBuilder()
                .withSerializer(Serializer.using(workplaceNamespace))
                .withName("workflow-context-map")
                .withApplicationId(appId)
                .build();
        contextMap.addListener(contextMapEventListener);

        workplaceMapEventListener.syncLocal();
        contextMapEventListener.syncLocal();
        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        workplaceMap.destroy();
        localWorkplaceMap.clear();
        contextMap.destroy();
        localContextMap.clear();

        log.info("Stopped");
    }

    @Override
    public void registerWorkplace(String name, Workplace workplace) throws StorageException {
        workplaceMap.put(name, workplace);
    }

    @Override
    public void removeWorkplace(String name) throws StorageException {
        removeWorkplaceContexts(name);
        workplaceMap.remove(name);
    }

    @Override
    public Workplace getWorkplace(String name) throws StorageException {
        return localWorkplaceMap.get(name);
    }

    @Override
    public Collection<Workplace> getWorkplaces() throws StorageException {
        return ImmutableList.copyOf(localWorkplaceMap.values());
    }

    @Override
    public void commitWorkplace(String name, Workplace workplace, boolean handleEvent) throws StorageException {
        workplace.setTriggerNext(handleEvent);
        if (workplaceMap.containsKey(name)) {
            workplaceMap.replace(name, workplace);
        } else {
            registerWorkplace(name, workplace);
        }
    }

    @Override
    public void registerContext(String name, WorkflowContext context) throws StorageException {
        contextMap.put(name, context);
    }

    @Override
    public void removeContext(String name) throws StorageException {
        contextMap.remove(name);
    }

    @Override
    public WorkflowContext getContext(String name) throws StorageException {
        return localContextMap.get(name);
    }

    @Override
    public void commitContext(String name, WorkflowContext context, boolean handleEvent) throws StorageException {
        context.setTriggerNext(handleEvent);
        if (contextMap.containsKey(name)) {
            contextMap.replace(name, context);
        } else {
            registerContext(name, context);
        }
    }

    @Override
    public Collection<WorkflowContext> getContexts() throws StorageException {
        return ImmutableList.copyOf(localContextMap.values());
    }

    @Override
    public Collection<WorkflowContext> getWorkplaceContexts(String workplaceName) {
        Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
        if (ctxMap == null) {
            return Collections.emptyList();
        }

        return ImmutableList.copyOf(ctxMap.values());
    }

    @Override
    public void removeWorkplaceContexts(String workplaceName) {
        for (WorkflowContext ctx : getWorkplaceContexts(workplaceName)) {
            removeContext(ctx.name());
        }
    }

    private class WorkplaceMapListener implements MapEventListener<String, WorkflowData> {

        @Override
        public void event(MapEvent<String, WorkflowData> event) {

            Workplace newWorkplace = (Workplace) Versioned.valueOrNull(event.newValue());
            Workplace oldWorkplace = (Workplace) Versioned.valueOrNull(event.oldValue());

            log.info("WorkplaceMap event: {}", event);
            switch (event.type()) {
                case INSERT:
                    insert(newWorkplace);
                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newWorkplace));
                    break;
                case UPDATE:
                    update(newWorkplace);
                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newWorkplace));
                    break;
                case REMOVE:
                    remove(oldWorkplace);
                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldWorkplace));
                    break;
                default:
            }
        }

        private void insert(Workplace workplace) {
            localWorkplaceMap.put(workplace.name(), workplace);
        }

        private void update(Workplace workplace) {
            localWorkplaceMap.replace(workplace.name(), workplace);
        }

        private void remove(Workplace workplace) {
            localWorkplaceMap.remove(workplace.name());
        }

        public void syncLocal() {
            workplaceMap.values().stream().forEach(
                    x -> insert((Workplace) (x.value()))
            );
        }
    }

    private class WorkflowContextMapListener implements MapEventListener<String, WorkflowData> {

        @Override
        public void event(MapEvent<String, WorkflowData> event) {

            WorkflowContext newContext = (WorkflowContext) Versioned.valueOrNull(event.newValue());
            WorkflowContext oldContext = (WorkflowContext) Versioned.valueOrNull(event.oldValue());

            log.info("WorkflowContext event: {}", event);
            switch (event.type()) {
                case INSERT:
                    insert(newContext);
                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newContext));
                    break;
                case UPDATE:
                    update(newContext);
                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newContext));
                    break;
                case REMOVE:
                    remove(oldContext);
                    notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldContext));
                    break;
                default:
            }
        }

        /**
         * Inserts workflow context on local hash map.
         * @param context workflow context
         */
        private void insert(WorkflowContext context) {
            String workplaceName = context.workplaceName();
            Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
            if (ctxMap == null) {
                ctxMap = new HashMap<>();
                localWorkplaceMemberMap.put(workplaceName, ctxMap);
            }
            ctxMap.put(context.name(), context);

            localContextMap.put(context.name(), context);
        }

        /**
         * Updates workflow context on local hash map.
         * @param context workflow context
         */
        private void update(WorkflowContext context) {
            String workplaceName = context.workplaceName();
            Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
            if (ctxMap == null) {
                ctxMap = new HashMap<>();
                localWorkplaceMemberMap.put(workplaceName, ctxMap);
            }
            ctxMap.put(context.name(), context);

            localContextMap.put(context.name(), context);
        }

        /**
         * Removes workflow context from local hash map.
         * @param context workflow context
         */
        private void remove(WorkflowContext context) {
            localContextMap.remove(context.name());

            String workplaceName = context.workplaceName();
            Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
            if (ctxMap == null) {
                log.error("remove-context: Failed to find workplace({}) in localWorkplaceMemberMap", workplaceName);
                return;
            }
            ctxMap.remove(context.name());
            if (ctxMap.size() == 0) {
                localWorkplaceMemberMap.remove(workplaceName, ctxMap);
            }
        }

        /**
         * Synchronizes local hash map.
         */
        public void syncLocal() {
            contextMap.values().stream().forEach(
                    x -> insert((WorkflowContext) (x.value()))
            );
        }
    }
}
