[ONOS-7732] Automating switch workflow: annotation based data model injection, applying programming counter, and small fixes
Change-Id: I4092d9c2695bcc8c4e8e01d54c442d3fac284eb6
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowCommand.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowCommand.java
index 3f96ba9..5c0f5de 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowCommand.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowCommand.java
@@ -21,23 +21,22 @@
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.workflow.api.DefaultWorkflowDescription;
+import org.onosproject.workflow.api.WorkflowContext;
+import org.onosproject.workflow.api.WorkflowExecutionService;
import org.onosproject.workflow.api.WorkflowService;
import org.onosproject.workflow.api.WorkflowException;
+import org.onosproject.workflow.api.WorkplaceStore;
-import java.util.Arrays;
import java.util.Objects;
@Service
@Command(scope = "onos", name = "workflow", description = "workflow cli")
public class WorkFlowCommand extends AbstractShellCommand {
- @Argument(index = 0, name = "cmd", description = "command(invoke)", required = true)
+ @Argument(index = 0, name = "cmd", description = "command(invoke|eval)", required = true)
private String cmd = null;
- @Argument (index = 1, name = "id", description = "workflow id(URI)", required = true)
- private String id = null;
-
- @Argument (index = 2, name = "name", description = "workplace name", required = true)
+ @Argument(index = 1, name = "name", description = "workflow context name(workflow@workplace)", required = true)
private String name = null;
@Override
@@ -47,19 +46,26 @@
return;
}
+ if (Objects.isNull(name)) {
+ error("invalid workflow context name");
+ return;
+ }
+
+ String[] tokens = name.split("@");
+ if (tokens != null && tokens.length != 2) {
+ error("invalid workflow context name(workflow@workplace)");
+ return;
+ }
+
+ String workflowId = tokens[0];
+ String workplace = tokens[1];
+
switch (cmd) {
case "invoke":
- if (Objects.isNull(id)) {
- error("invalid workflow id parameter");
- return;
- }
-
- if (Objects.isNull(name)) {
- error("invalid workplace name parameter");
- return;
- }
-
- invoke(id, name);
+ invoke(workflowId, workplace);
+ break;
+ case "eval":
+ eval(name);
break;
default:
print("Unsupported cmd: " + cmd);
@@ -74,15 +80,33 @@
private void invoke(String workflowId, String workplaceName) {
WorkflowService service = get(WorkflowService.class);
- DefaultWorkflowDescription wfDesc = DefaultWorkflowDescription.builder()
- .workplaceName(workplaceName)
- .id(workflowId)
- .data(JsonNodeFactory.instance.objectNode())
- .build();
try {
+ DefaultWorkflowDescription wfDesc = DefaultWorkflowDescription.builder()
+ .workplaceName(workplaceName)
+ .id(workflowId)
+ .data(JsonNodeFactory.instance.objectNode())
+ .build();
+
service.invokeWorkflow(wfDesc);
} catch (WorkflowException e) {
- error(e.getMessage() + ", trace: " + Arrays.asList(e.getStackTrace()));
+ error("Exception: ", e);
}
}
+
+ /**
+ * Evaluates workflow context.
+ * @param workflowContextName workflow context name
+ */
+ private void eval(String workflowContextName) {
+ WorkplaceStore storService = get(WorkplaceStore.class);
+ WorkflowExecutionService execService = get(WorkflowExecutionService.class);
+
+ WorkflowContext context = storService.getContext(workflowContextName);
+ if (context == null) {
+ error("failed to find workflow context {}", workflowContextName);
+ return;
+ }
+ execService.eval(workflowContextName);
+ }
+
}
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowEventMapCommand.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowEventMapCommand.java
index e23e97d..5240252 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowEventMapCommand.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowEventMapCommand.java
@@ -51,7 +51,7 @@
try {
print(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(tree));
} catch (JsonProcessingException e) {
- error(e.getMessage() + ", trace: " + Arrays.asList(e.getStackTrace()));
+ error("Exception: " + e.getMessage() + ", trace: " + Arrays.asList(e.getStackTrace()));
}
break;
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowStoreCommand.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowStoreCommand.java
index f5e795a..d1e37c8 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowStoreCommand.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowStoreCommand.java
@@ -32,7 +32,7 @@
@Argument(index = 0, name = "cmd", description = "command(rm)", required = false)
private String cmd = null;
- @Argument (index = 1, name = "id", description = "workflow id(URI)", required = false)
+ @Argument(index = 1, name = "id", description = "workflow id(URI)", required = false)
private String id = null;
@Override
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowTestCommand.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowTestCommand.java
index b3c34ec..8d13508 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowTestCommand.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkFlowTestCommand.java
@@ -16,6 +16,7 @@
package org.onosproject.workflow.cli;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.karaf.shell.api.action.Argument;
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Service;
@@ -34,7 +35,7 @@
@Argument(index = 0, name = "cmd", description = "command(invoke)", required = true)
private String cmd = null;
- @Argument (index = 1, name = "number", description = "number of test", required = true)
+ @Argument(index = 1, name = "number", description = "number of test", required = true)
private String number = null;
@Override
@@ -74,12 +75,16 @@
private void invoke(String workflowId, String workplaceName) {
WorkflowService service = get(WorkflowService.class);
- DefaultWorkflowDescription wfDesc = DefaultWorkflowDescription.builder()
- .workplaceName(workplaceName)
- .id(workflowId)
- .data(JsonNodeFactory.instance.objectNode())
- .build();
+
+ ObjectNode dataModel = JsonNodeFactory.instance.objectNode();
+ dataModel.put("count", 0);
+
try {
+ DefaultWorkflowDescription wfDesc = DefaultWorkflowDescription.builder()
+ .workplaceName(workplaceName)
+ .id(workflowId)
+ .data(dataModel)
+ .build();
service.invokeWorkflow(wfDesc);
} catch (WorkflowException e) {
error(e.getMessage() + "trace: " + Arrays.asList(e.getStackTrace()));
@@ -94,6 +99,8 @@
for (int i = 0; i <= num; i++) {
String wpName = "test-" + i;
invoke("sample.workflow-0", wpName);
+ invoke("sample.workflow-1", wpName);
+ invoke("sample.workflow-2", wpName);
}
}
}
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkplaceStoreCommand.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkplaceStoreCommand.java
index db1a736..1b4ba8a 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkplaceStoreCommand.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/cli/WorkplaceStoreCommand.java
@@ -38,7 +38,7 @@
@Argument(index = 0, name = "cmd", description = "command(add/rm/clear/print)", required = false)
private String cmd = null;
- @Argument (index = 1, name = "name", description = "workspace name", required = false)
+ @Argument(index = 1, name = "name", description = "workspace name", required = false)
private String name = null;
@Option(name = "-f", aliases = "--filter", description = "including filter",
@@ -97,10 +97,10 @@
*/
private void addEmptyWorkplace(String name) {
WorkflowService service = get(WorkflowService.class);
- DefaultWorkplaceDescription wpDesc = DefaultWorkplaceDescription.builder()
- .name(name)
- .build();
try {
+ DefaultWorkplaceDescription wpDesc = DefaultWorkplaceDescription.builder()
+ .name(name)
+ .build();
service.createWorkplace(wpDesc);
} catch (WorkflowException e) {
error(e.getMessage() + ", trace: " + Arrays.asList(e.getStackTrace()));
@@ -125,10 +125,10 @@
*/
private void rmWorkplace(String name) {
WorkflowService service = get(WorkflowService.class);
- DefaultWorkplaceDescription wpDesc = DefaultWorkplaceDescription.builder()
- .name(name)
- .build();
try {
+ DefaultWorkplaceDescription wpDesc = DefaultWorkplaceDescription.builder()
+ .name(name)
+ .build();
service.removeWorkplace(wpDesc);
} catch (WorkflowException e) {
error(e.getMessage() + ", trace: " + Arrays.asList(e.getStackTrace()));
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java
index 1ee2cf0..1b725de 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java
@@ -89,13 +89,13 @@
@Override
public void registerEventMap(String eventType, String eventHint,
- String contextName, String workletType) throws WorkflowException {
+ String contextName, String programCounterString) throws WorkflowException {
DocumentPath dpath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
String currentWorkletType = completeVersioned(eventMapTree.get(dpath));
if (currentWorkletType == null) {
- complete(eventMapTree.createRecursive(dpath, workletType));
+ complete(eventMapTree.createRecursive(dpath, programCounterString));
} else {
- complete(eventMapTree.replace(dpath, workletType, currentWorkletType));
+ complete(eventMapTree.replace(dpath, programCounterString, currentWorkletType));
}
}
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java
index 31fe044..12de3f0 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedWorkplaceStore.java
@@ -52,6 +52,7 @@
import org.onosproject.workflow.api.DefaultWorkplace;
import org.onosproject.workflow.api.DefaultWorkflowContext;
import org.onosproject.workflow.api.JsonDataModelTree;
+import org.onosproject.workflow.api.ProgramCounter;
import org.onosproject.workflow.api.SystemWorkflowContext;
import org.onosproject.workflow.api.WorkflowContext;
import org.onosproject.workflow.api.WorkflowData;
@@ -119,6 +120,7 @@
.register(DefaultWorkflowContext.class)
.register(SystemWorkflowContext.class)
.register(WorkflowState.class)
+ .register(ProgramCounter.class)
.register(DataModelTree.class)
.register(JsonDataModelTree.class)
.register(List.class)
@@ -311,7 +313,7 @@
WorkflowContext newContext = (WorkflowContext) Versioned.valueOrNull(event.newValue());
WorkflowContext oldContext = (WorkflowContext) Versioned.valueOrNull(event.oldValue());
- log.info("WorkflowContext event: {}", event);
+ log.debug("WorkflowContext event: {}", event);
switch (event.type()) {
case INSERT:
insert(newContext);
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
index e4d545d..237f70e 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
@@ -22,10 +22,13 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.store.service.StorageException;
import org.onosproject.workflow.api.DefaultWorkplace;
import org.onosproject.workflow.api.EventHintSupplier;
import org.onosproject.workflow.api.EventTask;
+import org.onosproject.workflow.api.JsonDataModelInjector;
import org.onosproject.workflow.api.JsonDataModelTree;
+import org.onosproject.workflow.api.ProgramCounter;
import org.onosproject.workflow.api.SystemWorkflowContext;
import org.onosproject.workflow.api.EventTimeoutTask;
import org.onosproject.workflow.api.TimeoutTask;
@@ -122,6 +125,8 @@
private TimerChain timerChain = new TimerChain();
+ private JsonDataModelInjector dataModelInjector = new JsonDataModelInjector();
+
public static final String APPID = "org.onosproject.workflow";
private ApplicationId appId;
private NodeId localNodeId;
@@ -177,11 +182,20 @@
try {
Worklet initWorklet = workflow.init(context);
if (initWorklet != null) {
+
+ log.info("{} worklet.process:{}", context.name(), initWorklet.tag());
+ log.trace("{} context: {}", context.name(), context);
+
+ dataModelInjector.inject(initWorklet, context);
initWorklet.process(context);
+ dataModelInjector.inhale(initWorklet, context);
+
+ log.info("{} worklet.process(done): {}", context.name(), initWorklet.tag());
+ log.trace("{} context: {}", context.name(), context);
}
} catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
context.setCause(e.getMessage());
context.setState(WorkflowState.EXCEPTION);
workplaceStore.commitContext(context.name(), context, false);
@@ -192,6 +206,20 @@
}
@Override
+ public void eval(String contextName) {
+
+ final WorkflowContext latestContext = workplaceStore.getContext(contextName);
+ if (latestContext == null) {
+ log.error("Invalid workflow context {}", contextName);
+ return;
+ }
+
+ initWorkletExecution(latestContext);
+
+ workplaceStore.commitContext(latestContext.name(), latestContext, true);
+ }
+
+ @Override
public void eventMapTrigger(Event event, EventHintSupplier supplier) {
if (event.subject() instanceof SystemWorkflowContext) {
@@ -204,7 +232,7 @@
try {
eventHint = supplier.apply(event);
} catch (Throwable e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
return;
}
if (eventHint == null) {
@@ -216,7 +244,7 @@
try {
eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
} catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
return;
}
@@ -228,28 +256,43 @@
for (Map.Entry<String, String> entry : eventMap.entrySet()) {
String contextName = entry.getKey();
- String workletType = entry.getValue();
+ String strProgramCounter = entry.getValue();
+ ProgramCounter pc;
+ try {
+ pc = ProgramCounter.valueOf(strProgramCounter);
+ } catch (IllegalArgumentException e) {
+ log.error("Exception: ", e);
+ return;
+ }
+
WorkflowContext context = workplaceStore.getContext(contextName);
if (Objects.isNull(context)) {
log.info("Invalid context: {}, event: {}", contextName, event);
continue;
}
- EventTask eventtask = EventTask.builder()
+ EventTask eventtask = null;
+ try {
+ eventtask = EventTask.builder()
.event(event)
.eventHint(eventHint)
.context(context)
- .workletType(workletType)
+ .programCounter(pc)
.build();
+ } catch (WorkflowException e) {
+ log.error("Exception: ", e);
+ }
- log.info("eventtaskAccumulator.add: task: {}", eventtask);
- eventtaskAccumulator.add(eventtask);
+ log.debug("eventtaskAccumulator.add: task: {}", eventtask);
+ if (!Objects.isNull(eventtask)) {
+ eventtaskAccumulator.add(eventtask);
+ }
}
}
@Override
public void registerEventMap(Class<? extends Event> eventType, String eventHint,
- String contextName, String workletType) throws WorkflowException {
- eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, workletType);
+ String contextName, String programCounterString) throws WorkflowException {
+ eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, programCounterString);
for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
if (eventMap != null && eventMap.containsKey(contextName)) {
@@ -259,6 +302,7 @@
log.info("sleep {}", i);
Thread.sleep(10L * (i + 1));
} catch (InterruptedException e) {
+ log.error("Exception: ", e);
Thread.currentThread().interrupt();
}
}
@@ -278,7 +322,7 @@
case INSERT:
case UPDATE:
if (dataModelContainer.triggerNext()) {
- log.info("workflowAccumulator.add: {}", dataModelContainer);
+ log.debug("workflowAccumulator.add: {}", dataModelContainer);
workflowAccumulator.add(dataModelContainer);
} else {
log.debug("pass-workflowAccumulator.add: {}", dataModelContainer);
@@ -430,33 +474,60 @@
}
try {
- Worklet worklet = workflow.getWorkletInstance(task.workletType());
- if (!Objects.equals(latestContext.current(), worklet.tag())) {
+ if (!Objects.equals(latestContext.current(), task.programCounter())) {
log.error("Current worklet({}) is not mismatched with task work({}). Ignored.",
- latestContext.current(), worklet.tag());
+ latestContext.current(), task.programCounter());
return task;
}
- if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
+ Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
+ if (Worklet.Common.COMPLETED.equals(worklet) || Worklet.Common.INIT.equals(worklet)) {
log.error("Current worklet is {}, Ignored", worklet);
return task;
}
initWorkletExecution(latestContext);
- log.info("processHandlerTask.isCompleted-task:{}, latest:{}", task, latestContext);
- if (worklet.isCompleted(latestContext, task.event())) {
+ log.info("{} worklet.isCompleted:{}", latestContext.name(), worklet.tag());
+ log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
+
+ dataModelInjector.inject(worklet, latestContext);
+ boolean completed = worklet.isCompleted(latestContext, task.event());
+ dataModelInjector.inhale(worklet, latestContext);
+
+ if (completed) {
+
+ log.info("{} worklet.isCompleted(true):{}", latestContext.name(), worklet.tag());
+ log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
+
eventMapStore.unregisterEventMap(
task.eventType(), task.eventHint(), latestContext.name());
+ //completed case
+ // increase program counter
+ ProgramCounter pc = latestContext.current();
+ latestContext.setCurrent(workflow.increased(pc));
+
workplaceStore.commitContext(latestContext.name(), latestContext, true);
return null;
} else {
+
+ log.info("{} worklet.isCompleted(false):{}", latestContext.name(), worklet.tag());
+ log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
+
workplaceStore.commitContext(latestContext.name(), latestContext, false);
}
} catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ } catch (StorageException e) {
+ log.error("Exception: ", e);
+ // StorageException does not commit context.
+ } catch (Exception e) {
+ log.error("Exception: ", e);
latestContext.setCause(e.getMessage());
latestContext.setState(WorkflowState.EXCEPTION);
workplaceStore.commitContext(latestContext.name(), latestContext, false);
@@ -491,7 +562,7 @@
log.debug("execEventTimeoutTask- task: {}, hash: {}", task, stringHash(task.context().distributor()));
- WorkflowContext context = task.context();
+ WorkflowContext context = (WorkflowContext) (task.context());
Workflow workflow = workflowStore.get(context.workflowId());
if (workflow == null) {
log.error("execEventTimeoutTask: Invalid workflow {}", context.workflowId());
@@ -505,13 +576,13 @@
}
try {
- Worklet worklet = workflow.getWorkletInstance(task.workletType());
- if (!Objects.equals(latestContext.current(), worklet.tag())) {
+ if (!Objects.equals(latestContext.current(), task.programCounter())) {
log.error("execEventTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
- latestContext.current(), worklet.tag());
+ latestContext.current(), task.programCounter());
return task;
}
+ Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
log.error("execEventTimeoutTask: Current worklet is {}, Ignored", worklet);
return task;
@@ -519,15 +590,32 @@
initWorkletExecution(latestContext);
- log.info("execEventTimeoutTask.timeout-task:{}, latest:{}", task, latestContext);
eventMapStore.unregisterEventMap(
task.eventType(), task.eventHint(), latestContext.name());
+ log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
+ log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
+
+ dataModelInjector.inject(worklet, latestContext);
worklet.timeout(latestContext);
+ dataModelInjector.inhale(worklet, latestContext);
+
+ log.info("{} worklet.timeout(for event)(done):{}", latestContext.name(), worklet.tag());
+ log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
+
+
workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
} catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ } catch (StorageException e) {
+ log.error("Exception: ", e);
+ // StorageException does not commit context.
+ } catch (Exception e) {
+ log.error("Exception: ", e);
latestContext.setCause(e.getMessage());
latestContext.setState(WorkflowState.EXCEPTION);
workplaceStore.commitContext(latestContext.name(), latestContext, false);
@@ -559,13 +647,13 @@
}
try {
- Worklet worklet = workflow.getWorkletInstance(task.workletType());
- if (!Objects.equals(latestContext.current(), worklet.tag())) {
+ if (!Objects.equals(latestContext.current(), task.programCounter())) {
log.error("execTimeoutTask: Current worklet({}) is not mismatched with task work({}). Ignored.",
- latestContext.current(), worklet.tag());
+ latestContext.current(), task.programCounter());
return task;
}
+ Worklet worklet = workflow.getWorkletInstance(task.programCounter().workletType());
if (worklet == Worklet.Common.COMPLETED || worklet == Worklet.Common.INIT) {
log.error("execTimeoutTask: Current worklet is {}, Ignored", worklet);
return task;
@@ -573,11 +661,28 @@
initWorkletExecution(latestContext);
+ log.info("{} worklet.timeout:{}", latestContext.name(), worklet.tag());
+ log.trace("{} context: {}", latestContext.name(), latestContext);
+
+ dataModelInjector.inject(worklet, latestContext);
worklet.timeout(latestContext);
+ dataModelInjector.inhale(worklet, latestContext);
+
+ log.info("{} worklet.timeout(done):{}", latestContext.name(), worklet.tag());
+ log.trace("{} context: {}", latestContext.name(), latestContext);
+
workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
} catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ } catch (StorageException e) {
+ log.error("Exception: ", e);
+ // StorageException does not commit context.
+ } catch (Exception e) {
+ log.error("Exception: ", e);
latestContext.setCause(e.getMessage());
latestContext.setState(WorkflowState.EXCEPTION);
workplaceStore.commitContext(latestContext.name(), latestContext, false);
@@ -651,14 +756,15 @@
initWorkletExecution(latestContext);
try {
- final Worklet worklet = workflow.next(latestContext);
+ final ProgramCounter pc = workflow.next(latestContext);
+ final Worklet worklet = workflow.getWorkletInstance(pc.workletType());
if (worklet == Worklet.Common.INIT) {
log.error("workflow.next gave INIT. It cannot be executed (context: {})", context.name());
return latestContext;
}
- latestContext.setCurrent(worklet);
+ latestContext.setCurrent(pc);
if (worklet == Worklet.Common.COMPLETED) {
if (workflow.attributes().contains(REMOVE_AFTER_COMPLETE)) {
@@ -671,8 +777,16 @@
}
}
- log.info("execWorkflowContext.process:{}, {}", worklet.tag(), latestContext);
+ log.info("{} worklet.process:{}", latestContext.name(), worklet.tag());
+ log.trace("{} context: {}", latestContext.name(), latestContext);
+
+ dataModelInjector.inject(worklet, latestContext);
worklet.process(latestContext);
+ dataModelInjector.inhale(worklet, latestContext);
+
+ log.info("{} worklet.process(done): {}", latestContext.name(), worklet.tag());
+ log.trace("{} context: {}", latestContext.name(), latestContext);
+
if (latestContext.completionEventType() != null) {
if (latestContext.completionEventGenerator() == null) {
@@ -683,14 +797,14 @@
}
registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
- latestContext.name(), worklet.tag());
+ latestContext.name(), pc.toString());
latestContext.completionEventGenerator().apply();
if (latestContext.completionEventTimeout() != 0L) {
final EventTimeoutTask eventTimeoutTask = EventTimeoutTask.builder()
.context(latestContext)
- .workletType(worklet.tag())
+ .programCounter(pc)
.eventType(latestContext.completionEventType().getName())
.eventHint(latestContext.completionEventHint())
.build();
@@ -703,20 +817,32 @@
if (latestContext.completionEventTimeout() != 0L) {
final TimeoutTask timeoutTask = TimeoutTask.builder()
.context(latestContext)
- .workletType(worklet.tag())
+ .programCounter(pc)
.build();
timerChain.schedule(latestContext.completionEventTimeout(),
() -> {
eventtaskAccumulator.add(timeoutTask);
});
+ } else {
+ //completed case
+ // increase program counter
+ latestContext.setCurrent(workflow.increased(pc));
}
}
workplaceStore.commitContext(latestContext.name(), latestContext, latestContext.triggerNext());
} catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
+ log.error("Exception: ", e);
+ latestContext.setCause(e.getMessage());
+ latestContext.setState(WorkflowState.EXCEPTION);
+ workplaceStore.commitContext(latestContext.name(), latestContext, false);
+ } catch (StorageException e) {
+ log.error("Exception: ", e);
+ // StorageException does not commit context.
+ } catch (Exception e) {
+ log.error("Exception: ", e);
latestContext.setCause(e.getMessage());
latestContext.setState(WorkflowState.EXCEPTION);
workplaceStore.commitContext(latestContext.name(), latestContext, false);
@@ -735,4 +861,4 @@
return null;
}
-}
\ No newline at end of file
+}
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkflowNetConfigListener.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkflowNetConfigListener.java
index 59a57e9..bb7f875 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkflowNetConfigListener.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkflowNetConfigListener.java
@@ -68,7 +68,7 @@
@Override
public boolean isRelevant(NetworkConfigEvent event) {
- return true;
+ return event.config().isPresent() && event.config().get() instanceof WorkflowNetConfig;
}
@Override
@@ -116,7 +116,7 @@
rpcMap.get(rpc.op()).apply(this.workflowService, rpc);
}
} catch (WorkflowException e) {
- e.printStackTrace();
+ log.error("Exception: ", e);
}
}
}
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkplaceWorkflow.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkplaceWorkflow.java
index af62aaa..219436c 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkplaceWorkflow.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkplaceWorkflow.java
@@ -143,6 +143,15 @@
return false;
}
}
+
+ @Override
+ public void timeout(WorkflowContext context) throws WorkflowException {
+ if (!isNext(context)) {
+ context.completed(); //Complete the job of worklet by timeout
+ } else {
+ super.timeout(context);
+ }
+ }
}
public static class CreateWorkflowContext extends AbsWorkflowWorklet {
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/example/SampleWorkflow.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/example/SampleWorkflow.java
index 20b3771..9d4e7b2 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/example/SampleWorkflow.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/example/SampleWorkflow.java
@@ -22,6 +22,7 @@
import org.onosproject.workflow.api.AbstractWorklet;
import org.onosproject.workflow.api.DataModelTree;
import org.onosproject.workflow.api.ImmutableListWorkflow;
+import org.onosproject.workflow.api.JsonDataModel;
import org.onosproject.workflow.api.JsonDataModelTree;
import org.onosproject.workflow.api.Workflow;
import org.onosproject.workflow.api.WorkflowContext;
@@ -122,7 +123,8 @@
*/
public abstract static class AbsSampleWorklet extends AbstractWorklet {
- protected static final String SAMPLE_DATAMODEL_PTR = "/sample/job";
+ protected static final String MODEL_SAMPLE_JOB = "/sample/job";
+ protected static final String MODEL_COUNT = "/count";
/**
* Constructor for sample worklet.
@@ -142,10 +144,10 @@
JsonDataModelTree tree = (JsonDataModelTree) context.data();
JsonNode params = tree.root();
- if (params.at(SAMPLE_DATAMODEL_PTR).getNodeType() == JsonNodeType.MISSING) {
- tree.alloc(SAMPLE_DATAMODEL_PTR, DataModelTree.Nodetype.MAP);
+ if (params.at(MODEL_SAMPLE_JOB).getNodeType() == JsonNodeType.MISSING) {
+ tree.alloc(MODEL_SAMPLE_JOB, DataModelTree.Nodetype.MAP);
}
- return (ObjectNode) params.at(SAMPLE_DATAMODEL_PTR);
+ return (ObjectNode) params.at(MODEL_SAMPLE_JOB);
}
/**
@@ -156,7 +158,7 @@
*/
protected ObjectNode getDataModel(WorkflowContext context) throws WorkflowException {
DataModelTree tree = context.data();
- return ((JsonDataModelTree) tree.subtree(SAMPLE_DATAMODEL_PTR)).rootObject();
+ return ((JsonDataModelTree) tree.subtree(MODEL_SAMPLE_JOB)).rootObject();
}
/**
@@ -167,6 +169,7 @@
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
+ log.error("Exception: ", e);
Thread.currentThread().interrupt();
}
}
@@ -176,6 +179,10 @@
* Class for sample worklet-1.
*/
public static class SampleWorklet1 extends AbsSampleWorklet {
+
+ @JsonDataModel(path = MODEL_COUNT, type = Integer.class)
+ Integer intCount;
+
@Override
public void process(WorkflowContext context) throws WorkflowException {
ObjectNode node = getDataModel(context);
@@ -200,12 +207,17 @@
* Class for sample worklet-2 (using timeout).
*/
public static class SampleWorklet2 extends AbsSampleWorklet {
+
+ @JsonDataModel(path = MODEL_COUNT, type = Integer.class)
+ Integer intCount;
+
@Override
public void process(WorkflowContext context) throws WorkflowException {
ObjectNode node = getDataModel(context);
node.put("work2", "done");
log.info("workflow-process {}-{}", context.workplaceName(), this.getClass().getSimpleName());
sleep(50);
+ intCount++;
context.waitFor(50L); //Timeout will happen after 50 milli seconds.
}
@@ -225,13 +237,17 @@
}
public static class SampleWorklet3 extends AbsSampleWorklet {
+
+ @JsonDataModel(path = MODEL_COUNT, type = Integer.class)
+ Integer intCount;
+
@Override
public void process(WorkflowContext context) throws WorkflowException {
ObjectNode node = getDataModel(context);
node.put("work3", "done");
log.info("workflow-process {}-{}", context.workplaceName(), this.getClass().getSimpleName());
sleep(10);
-
+ intCount++;
context.completed();
}
@@ -245,13 +261,17 @@
}
public static class SampleWorklet4 extends AbsSampleWorklet {
+
+ @JsonDataModel(path = MODEL_COUNT, type = Integer.class)
+ Integer intCount;
+
@Override
public void process(WorkflowContext context) throws WorkflowException {
ObjectNode node = getDataModel(context);
node.put("work4", "done");
log.info("workflow-process {}-{}", context.workplaceName(), this.getClass().getSimpleName());
sleep(10);
-
+ intCount++;
context.completed();
}
@@ -265,13 +285,17 @@
}
public static class SampleWorklet5 extends AbsSampleWorklet {
+
+ @JsonDataModel(path = MODEL_COUNT, type = Integer.class)
+ Integer intCount;
+
@Override
public void process(WorkflowContext context) throws WorkflowException {
ObjectNode node = getDataModel(context);
node.put("work5", "done");
log.info("workflow-process {}-{}", context.workplaceName(), this.getClass().getSimpleName());
sleep(10);
-
+ intCount++;
context.completed();
}
@@ -284,4 +308,4 @@
}
}
-}
\ No newline at end of file
+}