blob: 219436c1e2f24fa8d796de0f18aaa706d67dd82b [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.BooleanNode;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import org.onosproject.event.Event;
22import org.onosproject.workflow.api.AbstractWorklet;
23import org.onosproject.workflow.api.DefaultWorkflowContext;
24import org.onosproject.workflow.api.DefaultWorkplace;
25import org.onosproject.workflow.api.ImmutableListWorkflow;
26import org.onosproject.workflow.api.JsonDataModelTree;
27import org.onosproject.workflow.api.SystemWorkflowContext;
28import org.onosproject.workflow.api.Workflow;
29import org.onosproject.workflow.api.WorkflowAttribute;
30import org.onosproject.workflow.api.WorkflowContext;
31import org.onosproject.workflow.api.WorkflowData;
32import org.onosproject.workflow.api.WorkflowDataEvent;
33import org.onosproject.workflow.api.DefaultWorkflowDescription;
34import org.onosproject.workflow.api.WorkflowDescription;
35import org.onosproject.workflow.api.WorkflowException;
36import org.onosproject.workflow.api.WorkflowExecutionService;
37import org.onosproject.workflow.api.WorkflowStore;
38import org.onosproject.workflow.api.Workplace;
39import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
42import java.net.URI;
43import java.util.Objects;
44
45import static org.onosproject.workflow.api.WorkflowDataEvent.Type.INSERT;
46
47public class WorkplaceWorkflow {
48
49 private static final Logger log = LoggerFactory.getLogger(WorkplaceWorkflow.class);
50
51 private WorkflowExecutionService workflowExecutionService;
52 private WorkflowStore workflowStore;
53
54 public WorkplaceWorkflow(WorkflowExecutionService workflowExecutionService,
55 WorkflowStore workflowStore) {
56 this.workflowExecutionService = workflowExecutionService;
57 this.workflowStore = workflowStore;
58 }
59
60 public static final String WF_CREATE_WORKFLOW = "workplace.create-workflow";
61
62 public void registerWorkflows() {
63
64 Workflow workflow = ImmutableListWorkflow.builder()
65 .id(URI.create(WF_CREATE_WORKFLOW))
66 .attribute(WorkflowAttribute.REMOVE_AFTER_COMPLETE)
67 .init(ChangeDistributor.class.getName())
68 .chain(CreateWorkplace.class.getName())
69 .chain(CreateWorkflowContext.class.getName())
70 .build();
71 workflowStore.register(workflow);
72 }
73
74 public abstract static class AbsWorkflowWorklet extends AbstractWorklet {
75
76 protected WorkflowDescription getWorkflowDesc(WorkflowContext context) throws WorkflowException {
77
78 JsonNode root = ((JsonDataModelTree) context.data()).root();
79 return DefaultWorkflowDescription.valueOf(root);
80 }
81 }
82
83 public static class ChangeDistributor extends AbsWorkflowWorklet {
84
85 @Override
86 public void process(WorkflowContext context) throws WorkflowException {
87
88 String workplaceName = getWorkflowDesc(context).workplaceName();
89 // Sets workflow job distribution hash value to make this workflow to be executed on the
90 // same cluster node to execute workplace tasks.
91 ((SystemWorkflowContext) context).setDistributor(workplaceName);
92
93 context.completed();
94 }
95 }
96
97 public static class CreateWorkplace extends AbsWorkflowWorklet {
98
99 @Override
100 public boolean isNext(WorkflowContext context) throws WorkflowException {
101
102 WorkflowDescription wfDesc = getWorkflowDesc(context);
103
104 Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
105 return Objects.isNull(workplace);
106 }
107
108 @Override
109 public void process(WorkflowContext context) throws WorkflowException {
110
111 WorkflowDescription wfDesc = getWorkflowDesc(context);
112
113 // creates workplace with empty data model
114 DefaultWorkplace workplace = new DefaultWorkplace(wfDesc.workplaceName(), new JsonDataModelTree());
115 log.info("registerWorkplace {}", workplace);
116 context.waitCompletion(WorkflowDataEvent.class, wfDesc.workplaceName(),
117 () -> context.workplaceStore().registerWorkplace(wfDesc.workplaceName(), workplace),
118 60000L
119 );
120 }
121
122 @Override
123 public boolean isCompleted(WorkflowContext context, Event event)throws WorkflowException {
124
125 if (!(event instanceof WorkflowDataEvent)) {
126 return false;
127 }
128
129 WorkflowDataEvent wfEvent = (WorkflowDataEvent) event;
130 WorkflowData wfData = wfEvent.subject();
131
132 WorkflowDescription wfDesc = getWorkflowDesc(context);
133
134 if (wfData instanceof Workplace
135 && Objects.equals(wfData.name(), wfDesc.workplaceName())
136 && wfEvent.type() == INSERT) {
137 log.info("isCompleted(true): event:{}, context:{}, workplace:{}",
138 event, context, wfDesc.workplaceName());
139 return true;
140 } else {
141 log.info("isCompleted(false) event:{}, context:{}, workplace:{}",
142 event, context, wfDesc.workplaceName());
143 return false;
144 }
145 }
jaegonkime0f45b52018-10-09 20:23:26 +0900146
147 @Override
148 public void timeout(WorkflowContext context) throws WorkflowException {
149 if (!isNext(context)) {
150 context.completed(); //Complete the job of worklet by timeout
151 } else {
152 super.timeout(context);
153 }
154 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900155 }
156
157 public static class CreateWorkflowContext extends AbsWorkflowWorklet {
158
159 private static final String SUBMITTED = "submitted";
160
161 private boolean isSubmitted(WorkflowContext context) throws WorkflowException {
162 JsonNode node = ((JsonDataModelTree) context.data()).nodeAt("/" + SUBMITTED);
163 if (!(node instanceof BooleanNode)) {
164 return false;
165 }
166 return node.asBoolean();
167 }
168
169 private void submitTrue(WorkflowContext context) throws WorkflowException {
170 JsonNode root = ((JsonDataModelTree) context.data()).root();
171 if (!(root instanceof ObjectNode)) {
172 throw new WorkflowException("Invalid root node for " + context);
173 }
174 ((ObjectNode) root).put(SUBMITTED, true);
175 }
176
177 @Override
178 public boolean isNext(WorkflowContext context) throws WorkflowException {
179
180 WorkflowDescription wfDesc = getWorkflowDesc(context);
181
182 String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
183 if (Objects.isNull(context.workplaceStore().getContext(contextName))) {
184 return (!isSubmitted(context));
185 } else {
186 return false;
187 }
188 }
189
190 @Override
191 public void process(WorkflowContext context) throws WorkflowException {
192
193 WorkflowDescription wfDesc = getWorkflowDesc(context);
194
195 Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
196 if (Objects.isNull(workplace)) {
197
198 log.error("Failed to find workplace with " + wfDesc.workplaceName());
199 throw new WorkflowException("Failed to find workplace with " + wfDesc.workplaceName());
200 }
201
202 Workflow workflow = context.workflowStore().get(wfDesc.id());
203 if (Objects.isNull(workflow)) {
204 throw new WorkflowException("Failed to find workflow with " + wfDesc.id());
205 }
206
207 //String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
208 String contextName = wfDesc.workflowContextName();
209 if (Objects.nonNull(context.workplaceStore().getContext(contextName))) {
210 throw new WorkflowException(contextName + " exists already");
211 }
212
213 JsonDataModelTree subTree = new JsonDataModelTree(wfDesc.data());
214 WorkflowContext buildingContext = workflow.buildContext(workplace, subTree);
215 log.info("registerContext {}", buildingContext.name());
216 context.workplaceStore().registerContext(buildingContext.name(), buildingContext);
217 submitTrue(context);
218
219 context.completed();
220 }
221 }
222}