blob: 7bfe01b53e99b66665cf133e7558a7045c8ab17c [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.BooleanNode;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import org.onosproject.event.Event;
22import org.onosproject.workflow.api.AbstractWorklet;
23import org.onosproject.workflow.api.DefaultWorkflowContext;
24import org.onosproject.workflow.api.DefaultWorkplace;
25import org.onosproject.workflow.api.ImmutableListWorkflow;
nitinanandc8b70252019-04-17 15:35:43 +053026import org.onosproject.workflow.api.JsonDataModelInjector;
jaegonkim6a7b5242018-09-12 23:09:42 +090027import org.onosproject.workflow.api.JsonDataModelTree;
28import org.onosproject.workflow.api.SystemWorkflowContext;
nitinanandc8b70252019-04-17 15:35:43 +053029import org.onosproject.workflow.api.TriggerWorklet;
jaegonkim6a7b5242018-09-12 23:09:42 +090030import org.onosproject.workflow.api.Workflow;
31import org.onosproject.workflow.api.WorkflowAttribute;
32import org.onosproject.workflow.api.WorkflowContext;
33import org.onosproject.workflow.api.WorkflowData;
34import org.onosproject.workflow.api.WorkflowDataEvent;
35import org.onosproject.workflow.api.DefaultWorkflowDescription;
36import org.onosproject.workflow.api.WorkflowDescription;
37import org.onosproject.workflow.api.WorkflowException;
38import org.onosproject.workflow.api.WorkflowExecutionService;
39import org.onosproject.workflow.api.WorkflowStore;
nitinanandc8b70252019-04-17 15:35:43 +053040import org.onosproject.workflow.api.Worklet;
jaegonkim6a7b5242018-09-12 23:09:42 +090041import org.onosproject.workflow.api.Workplace;
42import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
45import java.net.URI;
46import java.util.Objects;
47
48import static org.onosproject.workflow.api.WorkflowDataEvent.Type.INSERT;
49
50public class WorkplaceWorkflow {
51
52 private static final Logger log = LoggerFactory.getLogger(WorkplaceWorkflow.class);
53
54 private WorkflowExecutionService workflowExecutionService;
55 private WorkflowStore workflowStore;
56
57 public WorkplaceWorkflow(WorkflowExecutionService workflowExecutionService,
58 WorkflowStore workflowStore) {
59 this.workflowExecutionService = workflowExecutionService;
60 this.workflowStore = workflowStore;
61 }
62
63 public static final String WF_CREATE_WORKFLOW = "workplace.create-workflow";
64
65 public void registerWorkflows() {
66
67 Workflow workflow = ImmutableListWorkflow.builder()
68 .id(URI.create(WF_CREATE_WORKFLOW))
69 .attribute(WorkflowAttribute.REMOVE_AFTER_COMPLETE)
70 .init(ChangeDistributor.class.getName())
71 .chain(CreateWorkplace.class.getName())
72 .chain(CreateWorkflowContext.class.getName())
73 .build();
74 workflowStore.register(workflow);
75 }
76
77 public abstract static class AbsWorkflowWorklet extends AbstractWorklet {
78
79 protected WorkflowDescription getWorkflowDesc(WorkflowContext context) throws WorkflowException {
80
81 JsonNode root = ((JsonDataModelTree) context.data()).root();
82 return DefaultWorkflowDescription.valueOf(root);
83 }
84 }
85
86 public static class ChangeDistributor extends AbsWorkflowWorklet {
87
88 @Override
89 public void process(WorkflowContext context) throws WorkflowException {
90
91 String workplaceName = getWorkflowDesc(context).workplaceName();
92 // Sets workflow job distribution hash value to make this workflow to be executed on the
93 // same cluster node to execute workplace tasks.
94 ((SystemWorkflowContext) context).setDistributor(workplaceName);
95
96 context.completed();
97 }
98 }
99
100 public static class CreateWorkplace extends AbsWorkflowWorklet {
101
102 @Override
103 public boolean isNext(WorkflowContext context) throws WorkflowException {
104
105 WorkflowDescription wfDesc = getWorkflowDesc(context);
106
107 Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
108 return Objects.isNull(workplace);
109 }
110
111 @Override
112 public void process(WorkflowContext context) throws WorkflowException {
113
114 WorkflowDescription wfDesc = getWorkflowDesc(context);
115
116 // creates workplace with empty data model
117 DefaultWorkplace workplace = new DefaultWorkplace(wfDesc.workplaceName(), new JsonDataModelTree());
118 log.info("registerWorkplace {}", workplace);
119 context.waitCompletion(WorkflowDataEvent.class, wfDesc.workplaceName(),
120 () -> context.workplaceStore().registerWorkplace(wfDesc.workplaceName(), workplace),
121 60000L
122 );
123 }
124
125 @Override
126 public boolean isCompleted(WorkflowContext context, Event event)throws WorkflowException {
127
128 if (!(event instanceof WorkflowDataEvent)) {
129 return false;
130 }
131
132 WorkflowDataEvent wfEvent = (WorkflowDataEvent) event;
133 WorkflowData wfData = wfEvent.subject();
134
135 WorkflowDescription wfDesc = getWorkflowDesc(context);
136
137 if (wfData instanceof Workplace
138 && Objects.equals(wfData.name(), wfDesc.workplaceName())
139 && wfEvent.type() == INSERT) {
140 log.info("isCompleted(true): event:{}, context:{}, workplace:{}",
141 event, context, wfDesc.workplaceName());
142 return true;
143 } else {
144 log.info("isCompleted(false) event:{}, context:{}, workplace:{}",
145 event, context, wfDesc.workplaceName());
146 return false;
147 }
148 }
jaegonkime0f45b52018-10-09 20:23:26 +0900149
150 @Override
151 public void timeout(WorkflowContext context) throws WorkflowException {
152 if (!isNext(context)) {
153 context.completed(); //Complete the job of worklet by timeout
154 } else {
155 super.timeout(context);
156 }
157 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900158 }
159
160 public static class CreateWorkflowContext extends AbsWorkflowWorklet {
161
162 private static final String SUBMITTED = "submitted";
163
164 private boolean isSubmitted(WorkflowContext context) throws WorkflowException {
165 JsonNode node = ((JsonDataModelTree) context.data()).nodeAt("/" + SUBMITTED);
166 if (!(node instanceof BooleanNode)) {
167 return false;
168 }
169 return node.asBoolean();
170 }
171
172 private void submitTrue(WorkflowContext context) throws WorkflowException {
173 JsonNode root = ((JsonDataModelTree) context.data()).root();
174 if (!(root instanceof ObjectNode)) {
175 throw new WorkflowException("Invalid root node for " + context);
176 }
177 ((ObjectNode) root).put(SUBMITTED, true);
178 }
179
180 @Override
181 public boolean isNext(WorkflowContext context) throws WorkflowException {
182
183 WorkflowDescription wfDesc = getWorkflowDesc(context);
184
185 String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
186 if (Objects.isNull(context.workplaceStore().getContext(contextName))) {
187 return (!isSubmitted(context));
188 } else {
189 return false;
190 }
191 }
192
193 @Override
194 public void process(WorkflowContext context) throws WorkflowException {
195
196 WorkflowDescription wfDesc = getWorkflowDesc(context);
197
198 Workplace workplace = context.workplaceStore().getWorkplace(wfDesc.workplaceName());
199 if (Objects.isNull(workplace)) {
200
201 log.error("Failed to find workplace with " + wfDesc.workplaceName());
202 throw new WorkflowException("Failed to find workplace with " + wfDesc.workplaceName());
203 }
204
205 Workflow workflow = context.workflowStore().get(wfDesc.id());
206 if (Objects.isNull(workflow)) {
207 throw new WorkflowException("Failed to find workflow with " + wfDesc.id());
208 }
209
210 //String contextName = DefaultWorkflowContext.nameBuilder(wfDesc.id(), wfDesc.workplaceName());
211 String contextName = wfDesc.workflowContextName();
212 if (Objects.nonNull(context.workplaceStore().getContext(contextName))) {
213 throw new WorkflowException(contextName + " exists already");
214 }
215
216 JsonDataModelTree subTree = new JsonDataModelTree(wfDesc.data());
217 WorkflowContext buildingContext = workflow.buildContext(workplace, subTree);
218 log.info("registerContext {}", buildingContext.name());
219 context.workplaceStore().registerContext(buildingContext.name(), buildingContext);
nitinanandc8b70252019-04-17 15:35:43 +0530220
221 if (workflow.getTriggerWorkletClassName().isPresent()) {
222 String triggerWorkletName = workflow.getTriggerWorkletClassName().get();
223 Worklet worklet = workflow.getTriggerWorkletInstance(triggerWorkletName);
224 if (worklet instanceof TriggerWorklet) {
225 buildingContext.setEventMapStore(context.eventMapStore());
226 JsonDataModelInjector dataModelInjector = new JsonDataModelInjector();
227 dataModelInjector.inject(worklet, buildingContext);
228 ((TriggerWorklet) worklet).register(buildingContext);
229 }
230 }
jaegonkim6a7b5242018-09-12 23:09:42 +0900231 submitTrue(context);
232
233 context.completed();
234 }
235 }
236}