multi event support for a worklet in workflow app
Change-Id: I3178110da75b26f96f8889acc0dd2c715fc567ec
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
index 237f70e..b472c5f 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
@@ -16,7 +16,6 @@
package org.onosproject.workflow.impl;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.google.common.collect.Lists;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
@@ -64,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -242,7 +242,7 @@
}
try {
- eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
+ eventMap = eventMapStore.getEventMapByHint(event.getClass().getName(), eventHint);
} catch (WorkflowException e) {
log.error("Exception: ", e);
return;
@@ -290,22 +290,25 @@
}
@Override
- public void registerEventMap(Class<? extends Event> eventType, String eventHint,
+ public void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
String contextName, String programCounterString) throws WorkflowException {
- eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, programCounterString);
- for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
- Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
- if (eventMap != null && eventMap.containsKey(contextName)) {
- return;
- }
- try {
- log.info("sleep {}", i);
- Thread.sleep(10L * (i + 1));
- } catch (InterruptedException e) {
- log.error("Exception: ", e);
- Thread.currentThread().interrupt();
+ eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounterString);
+ for (String eventHint : eventHintSet) {
+ for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
+ Map<String, String> eventMap = eventMapStore.getEventMapByHint(eventType.getName(), eventHint);
+ if (eventMap != null && eventMap.containsKey(contextName)) {
+ break;
+ }
+ try {
+ log.info("sleep {}", i);
+ Thread.sleep(10L * (i + 1));
+ } catch (InterruptedException e) {
+ log.error("Exception: ", e);
+ Thread.currentThread().interrupt();
+ }
}
}
+
}
@Override
@@ -442,19 +445,8 @@
*/
private EventTask execEventTask(EventTask task) {
- Map<String, String> eventMap = null;
- try {
- eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
- } catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
- return task;
- }
-
- if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
- return task;
- }
-
- if (Objects.isNull(eventMap.get(task.context().name()))) {
+ if (!eventMapStore.isEventMapPresent(task.context().name())) {
+ log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
return task;
}
@@ -501,10 +493,10 @@
log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
eventMapStore.unregisterEventMap(
- task.eventType(), task.eventHint(), latestContext.name());
+ task.eventType(), latestContext.name());
//completed case
- // increase program counter
+ //increase program counter
ProgramCounter pc = latestContext.current();
latestContext.setCurrent(workflow.increased(pc));
@@ -543,20 +535,8 @@
*/
private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
- Map<String, String> eventMap = null;
- try {
- eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
- } catch (WorkflowException e) {
- log.error("execEventTimeoutTask: Exception: {}, trace: {}",
- e, Lists.newArrayList(e.getStackTrace()));
- return task;
- }
-
- if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
- return task;
- }
-
- if (Objects.isNull(eventMap.get(task.context().name()))) {
+ if (!eventMapStore.isEventMapPresent(task.context().name())) {
+ log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
return task;
}
@@ -590,8 +570,7 @@
initWorkletExecution(latestContext);
- eventMapStore.unregisterEventMap(
- task.eventType(), task.eventHint(), latestContext.name());
+ eventMapStore.unregisterEventMap(task.eventType(), latestContext.name());
log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
@@ -796,7 +775,7 @@
throw new WorkflowException(msg);
}
- registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
+ registerEventMap(latestContext.completionEventType(), latestContext.completionEventHints(),
latestContext.name(), pc.toString());
latestContext.completionEventGenerator().apply();
@@ -806,7 +785,7 @@
.context(latestContext)
.programCounter(pc)
.eventType(latestContext.completionEventType().getName())
- .eventHint(latestContext.completionEventHint())
+ .eventHintSet(latestContext.completionEventHints())
.build();
timerChain.schedule(latestContext.completionEventTimeout(),
() -> {