blob: 219436c1e2f24fa8d796de0f18aaa706d67dd82b [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.workflow.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.event.Event;
import org.onosproject.workflow.api.AbstractWorklet;
import org.onosproject.workflow.api.DefaultWorkflowContext;
import org.onosproject.workflow.api.DefaultWorkplace;
import org.onosproject.workflow.api.ImmutableListWorkflow;
import org.onosproject.workflow.api.JsonDataModelTree;
import org.onosproject.workflow.api.SystemWorkflowContext;
import org.onosproject.workflow.api.Workflow;
import org.onosproject.workflow.api.WorkflowAttribute;
import org.onosproject.workflow.api.WorkflowContext;
import org.onosproject.workflow.api.WorkflowData;
import org.onosproject.workflow.api.WorkflowDataEvent;
import org.onosproject.workflow.api.DefaultWorkflowDescription;
import org.onosproject.workflow.api.WorkflowDescription;
import org.onosproject.workflow.api.WorkflowException;
import org.onosproject.workflow.api.WorkflowExecutionService;
import org.onosproject.workflow.api.WorkflowStore;
import org.onosproject.workflow.api.Workplace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Objects;
import static org.onosproject.workflow.api.WorkflowDataEvent.Type.INSERT;
public class WorkplaceWorkflow {
private static final Logger log = LoggerFactory.getLogger(WorkplaceWorkflow.class);
private WorkflowExecutionService workflowExecutionService;
private WorkflowStore workflowStore;
public WorkplaceWorkflow(WorkflowExecutionService workflowExecutionService,
WorkflowStore workflowStore) {
this.workflowExecutionService = workflowExecutionService;
this.workflowStore = workflowStore;
}
public static final String WF_CREATE_WORKFLOW = "workplace.create-workflow";
public void registerWorkflows() {
Workflow workflow = ImmutableListWorkflow.builder()
.id(URI.create(WF_CREATE_WORKFLOW))
.attribute(WorkflowAttribute.REMOVE_AFTER_COMPLETE)
.init(ChangeDistributor.class.getName())
.chain(CreateWorkplace.class.getName())
.chain(CreateWorkflowContext.class.getName())
.build();
workflowStore.register(workflow);
}
public abstract static class AbsWorkflowWorklet extends AbstractWorklet {
protected WorkflowDescription getWorkflowDesc(WorkflowContext context) throws WorkflowException {
JsonNode root = ((JsonDataModelTree) context.data()).root();
return DefaultWorkflowDescription.valueOf(root);
}
}
public static class ChangeDistributor extends AbsWorkflowWorklet {
@Override
public void process(WorkflowContext context) throws WorkflowException {
String workplaceName = getWorkflowDesc(context).workplaceName();
// Sets workflow job distribution hash value to make this workflow to be executed on the
// same cluster node to execute workplace tasks.
((SystemWorkflowContext) context).setDistributor(workplaceName);
context.completed();
}
}
public static class CreateWorkplace extends AbsWorkflowWorklet {
@Override
public boolean isNext(WorkflowContext context) throws WorkflowException {
WorkflowDescription wfDesc = getWorkflowDesc(context);
Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
return Objects.isNull(workplace);
}
@Override
public void process(WorkflowContext context) throws WorkflowException {
WorkflowDescription wfDesc = getWorkflowDesc(context);
// creates workplace with empty data model
DefaultWorkplace workplace = new DefaultWorkplace(wfDesc.workplaceName(), new JsonDataModelTree());
log.info("registerWorkplace {}", workplace);
context.waitCompletion(WorkflowDataEvent.class, wfDesc.workplaceName(),
() -> context.workplaceStore().registerWorkplace(wfDesc.workplaceName(), workplace),
60000L
);
}
@Override
public boolean isCompleted(WorkflowContext context, Event event)throws WorkflowException {
if (!(event instanceof WorkflowDataEvent)) {
return false;
}
WorkflowDataEvent wfEvent = (WorkflowDataEvent) event;
WorkflowData wfData = wfEvent.subject();
WorkflowDescription wfDesc = getWorkflowDesc(context);
if (wfData instanceof Workplace
&& Objects.equals(wfData.name(), wfDesc.workplaceName())
&& wfEvent.type() == INSERT) {
log.info("isCompleted(true): event:{}, context:{}, workplace:{}",
event, context, wfDesc.workplaceName());
return true;
} else {
log.info("isCompleted(false) event:{}, context:{}, workplace:{}",
event, context, wfDesc.workplaceName());
return false;
}
}
@Override
public void timeout(WorkflowContext context) throws WorkflowException {
if (!isNext(context)) {
context.completed(); //Complete the job of worklet by timeout
} else {
super.timeout(context);
}
}
}
public static class CreateWorkflowContext extends AbsWorkflowWorklet {
private static final String SUBMITTED = "submitted";
private boolean isSubmitted(WorkflowContext context) throws WorkflowException {
JsonNode node = ((JsonDataModelTree) context.data()).nodeAt("/" + SUBMITTED);
if (!(node instanceof BooleanNode)) {
return false;
}
return node.asBoolean();
}
private void submitTrue(WorkflowContext context) throws WorkflowException {
JsonNode root = ((JsonDataModelTree) context.data()).root();
if (!(root instanceof ObjectNode)) {
throw new WorkflowException("Invalid root node for " + context);
}
((ObjectNode) root).put(SUBMITTED, true);
}
@Override
public boolean isNext(WorkflowContext context) throws WorkflowException {
WorkflowDescription wfDesc = getWorkflowDesc(context);
String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
if (Objects.isNull(context.workplaceStore().getContext(contextName))) {
return (!isSubmitted(context));
} else {
return false;
}
}
@Override
public void process(WorkflowContext context) throws WorkflowException {
WorkflowDescription wfDesc = getWorkflowDesc(context);
Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
if (Objects.isNull(workplace)) {
log.error("Failed to find workplace with " + wfDesc.workplaceName());
throw new WorkflowException("Failed to find workplace with " + wfDesc.workplaceName());
}
Workflow workflow = context.workflowStore().get(wfDesc.id());
if (Objects.isNull(workflow)) {
throw new WorkflowException("Failed to find workflow with " + wfDesc.id());
}
//String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
String contextName = wfDesc.workflowContextName();
if (Objects.nonNull(context.workplaceStore().getContext(contextName))) {
throw new WorkflowException(contextName + " exists already");
}
JsonDataModelTree subTree = new JsonDataModelTree(wfDesc.data());
WorkflowContext buildingContext = workflow.buildContext(workplace, subTree);
log.info("registerContext {}", buildingContext.name());
context.workplaceStore().registerContext(buildingContext.name(), buildingContext);
submitTrue(context);
context.completed();
}
}
}