blob: af62aaab60b091d3d190b83437b43972e8879cf9 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.BooleanNode;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import org.onosproject.event.Event;
22import org.onosproject.workflow.api.AbstractWorklet;
23import org.onosproject.workflow.api.DefaultWorkflowContext;
24import org.onosproject.workflow.api.DefaultWorkplace;
25import org.onosproject.workflow.api.ImmutableListWorkflow;
26import org.onosproject.workflow.api.JsonDataModelTree;
27import org.onosproject.workflow.api.SystemWorkflowContext;
28import org.onosproject.workflow.api.Workflow;
29import org.onosproject.workflow.api.WorkflowAttribute;
30import org.onosproject.workflow.api.WorkflowContext;
31import org.onosproject.workflow.api.WorkflowData;
32import org.onosproject.workflow.api.WorkflowDataEvent;
33import org.onosproject.workflow.api.DefaultWorkflowDescription;
34import org.onosproject.workflow.api.WorkflowDescription;
35import org.onosproject.workflow.api.WorkflowException;
36import org.onosproject.workflow.api.WorkflowExecutionService;
37import org.onosproject.workflow.api.WorkflowStore;
38import org.onosproject.workflow.api.Workplace;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import java.net.URI;
43import java.util.Objects;
44
45import static org.onosproject.workflow.api.WorkflowDataEvent.Type.INSERT;
46
47public class WorkplaceWorkflow {
48
49 private static final Logger log = LoggerFactory.getLogger(WorkplaceWorkflow.class);
50
51 private WorkflowExecutionService workflowExecutionService;
52 private WorkflowStore workflowStore;
53
54 public WorkplaceWorkflow(WorkflowExecutionService workflowExecutionService,
55 WorkflowStore workflowStore) {
56 this.workflowExecutionService = workflowExecutionService;
57 this.workflowStore = workflowStore;
58 }
59
60 public static final String WF_CREATE_WORKFLOW = "workplace.create-workflow";
61
62 public void registerWorkflows() {
63
64 Workflow workflow = ImmutableListWorkflow.builder()
65 .id(URI.create(WF_CREATE_WORKFLOW))
66 .attribute(WorkflowAttribute.REMOVE_AFTER_COMPLETE)
67 .init(ChangeDistributor.class.getName())
68 .chain(CreateWorkplace.class.getName())
69 .chain(CreateWorkflowContext.class.getName())
70 .build();
71 workflowStore.register(workflow);
72 }
73
74 public abstract static class AbsWorkflowWorklet extends AbstractWorklet {
75
76 protected WorkflowDescription getWorkflowDesc(WorkflowContext context) throws WorkflowException {
77
78 JsonNode root = ((JsonDataModelTree) context.data()).root();
79 return DefaultWorkflowDescription.valueOf(root);
80 }
81 }
82
83 public static class ChangeDistributor extends AbsWorkflowWorklet {
84
85 @Override
86 public void process(WorkflowContext context) throws WorkflowException {
87
88 String workplaceName = getWorkflowDesc(context).workplaceName();
89 // Sets workflow job distribution hash value to make this workflow to be executed on the
90 // same cluster node to execute workplace tasks.
91 ((SystemWorkflowContext) context).setDistributor(workplaceName);
92
93 context.completed();
94 }
95 }
96
97 public static class CreateWorkplace extends AbsWorkflowWorklet {
98
99 @Override
100 public boolean isNext(WorkflowContext context) throws WorkflowException {
101
102 WorkflowDescription wfDesc = getWorkflowDesc(context);
103
104 Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
105 return Objects.isNull(workplace);
106 }
107
108 @Override
109 public void process(WorkflowContext context) throws WorkflowException {
110
111 WorkflowDescription wfDesc = getWorkflowDesc(context);
112
113 // creates workplace with empty data model
114 DefaultWorkplace workplace = new DefaultWorkplace(wfDesc.workplaceName(), new JsonDataModelTree());
115 log.info("registerWorkplace {}", workplace);
116 context.waitCompletion(WorkflowDataEvent.class, wfDesc.workplaceName(),
117 () -> context.workplaceStore().registerWorkplace(wfDesc.workplaceName(), workplace),
118 60000L
119 );
120 }
121
122 @Override
123 public boolean isCompleted(WorkflowContext context, Event event)throws WorkflowException {
124
125 if (!(event instanceof WorkflowDataEvent)) {
126 return false;
127 }
128
129 WorkflowDataEvent wfEvent = (WorkflowDataEvent) event;
130 WorkflowData wfData = wfEvent.subject();
131
132 WorkflowDescription wfDesc = getWorkflowDesc(context);
133
134 if (wfData instanceof Workplace
135 && Objects.equals(wfData.name(), wfDesc.workplaceName())
136 && wfEvent.type() == INSERT) {
137 log.info("isCompleted(true): event:{}, context:{}, workplace:{}",
138 event, context, wfDesc.workplaceName());
139 return true;
140 } else {
141 log.info("isCompleted(false) event:{}, context:{}, workplace:{}",
142 event, context, wfDesc.workplaceName());
143 return false;
144 }
145 }
146 }
147
148 public static class CreateWorkflowContext extends AbsWorkflowWorklet {
149
150 private static final String SUBMITTED = "submitted";
151
152 private boolean isSubmitted(WorkflowContext context) throws WorkflowException {
153 JsonNode node = ((JsonDataModelTree) context.data()).nodeAt("/" + SUBMITTED);
154 if (!(node instanceof BooleanNode)) {
155 return false;
156 }
157 return node.asBoolean();
158 }
159
160 private void submitTrue(WorkflowContext context) throws WorkflowException {
161 JsonNode root = ((JsonDataModelTree) context.data()).root();
162 if (!(root instanceof ObjectNode)) {
163 throw new WorkflowException("Invalid root node for " + context);
164 }
165 ((ObjectNode) root).put(SUBMITTED, true);
166 }
167
168 @Override
169 public boolean isNext(WorkflowContext context) throws WorkflowException {
170
171 WorkflowDescription wfDesc = getWorkflowDesc(context);
172
173 String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
174 if (Objects.isNull(context.workplaceStore().getContext(contextName))) {
175 return (!isSubmitted(context));
176 } else {
177 return false;
178 }
179 }
180
181 @Override
182 public void process(WorkflowContext context) throws WorkflowException {
183
184 WorkflowDescription wfDesc = getWorkflowDesc(context);
185
186 Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
187 if (Objects.isNull(workplace)) {
188
189 log.error("Failed to find workplace with " + wfDesc.workplaceName());
190 throw new WorkflowException("Failed to find workplace with " + wfDesc.workplaceName());
191 }
192
193 Workflow workflow = context.workflowStore().get(wfDesc.id());
194 if (Objects.isNull(workflow)) {
195 throw new WorkflowException("Failed to find workflow with " + wfDesc.id());
196 }
197
198 //String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
199 String contextName = wfDesc.workflowContextName();
200 if (Objects.nonNull(context.workplaceStore().getContext(contextName))) {
201 throw new WorkflowException(contextName + " exists already");
202 }
203
204 JsonDataModelTree subTree = new JsonDataModelTree(wfDesc.data());
205 WorkflowContext buildingContext = workflow.buildContext(workplace, subTree);
206 log.info("registerContext {}", buildingContext.name());
207 context.workplaceStore().registerContext(buildingContext.name(), buildingContext);
208 submitTrue(context);
209
210 context.completed();
211 }
212 }
213}