multi event support for a worklet in workflow app
Change-Id: I3178110da75b26f96f8889acc0dd2c715fc567ec
diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java
index 6d0f0fb..4b54cf3 100644
--- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java
+++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/ContextEventMapStore.java
@@ -21,6 +21,7 @@
import org.onosproject.store.service.Versioned;
import java.util.Map;
+import java.util.Set;
/**
* WorkflowContext Event Map Store.
@@ -30,36 +31,43 @@
/**
* Registers workflow context event mapping.
* @param eventType the class name of event
- * @param eventHint event hint string value of the event
+ * @param eventHintSet Set of event hint string value of the event
* @param contextName workflow context name
* @param programCounterString the program counter of workflow
* @throws WorkflowException workflow exception
*/
- void registerEventMap(String eventType, String eventHint,
+ void registerEventMap(String eventType, Set<String> eventHintSet,
String contextName, String programCounterString) throws WorkflowException;
/**
* Unregisters workflow context event mapping.
* @param eventType the class name of event
- * @param eventHint event hint string value of the event
* @param contextName workflow context name
* @throws WorkflowException workflow exception
*/
- void unregisterEventMap(String eventType, String eventHint,
+ void unregisterEventMap(String eventType,
String contextName) throws WorkflowException;
/**
* Returns workflow context event mapping.
* @param eventType the class name of event
- * @param eventHint event hint string value of the event
- * @return workflow context event mapping
+ * @param eventHint vent hint string value of the event
+ * @return Map of workflow context and value (program counter)
* @throws WorkflowException workflow exception
*/
- Map<String, String> getEventMap(String eventType, String eventHint) throws WorkflowException;
+ Map<String, String> getEventMapByHint(String eventType,
+ String eventHint) throws WorkflowException;
+
+ /**
+ * Returns true or false depending on existence of eventMap of given context.
+ * @param contextName name of workflow context
+ * @return Boolean true or false depending on existence of eventMap of given context
+ */
+ boolean isEventMapPresent(String contextName);
/**
* Returns child nodes on document tree path.
- * @param path document tree path
+ * @param path document tree path including eventType and Hint
* @return children under document tree path
* @throws WorkflowException workflow exception
*/
diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java
index 05c564a..8f5d3c7 100644
--- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java
+++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/DefaultWorkflowContext.java
@@ -21,6 +21,8 @@
import org.onosproject.event.Event;
import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
import static org.onosproject.workflow.api.CheckCondition.check;
@@ -60,9 +62,9 @@
private transient Class<? extends Event> completionEventType;
/**
- * Completion event hint.
+ * Completion event hint Set.
*/
- private transient String completionEventHint;
+ private transient Set<String> completionEventHintSet;
/**
* Completion event generator method reference.
@@ -170,7 +172,18 @@
public void waitCompletion(Class<? extends Event> eventType, String eventHint,
WorkExecutor eventGenerator, long timeoutMs) {
this.completionEventType = eventType;
- this.completionEventHint = eventHint;
+ this.completionEventHintSet = new HashSet<>();
+ this.completionEventHintSet.add(eventHint);
+ this.completionEventGenerator = eventGenerator;
+ this.completionEventTimeoutMs = timeoutMs;
+ }
+
+ @Override
+ public void waitAnyCompletion(Class<? extends Event> eventType, Set<String> eventHint,
+ WorkExecutor eventGenerator, long timeoutMs) {
+ this.completionEventType = eventType;
+ this.completionEventHintSet = new HashSet<>();
+ this.completionEventHintSet.addAll(eventHint);
this.completionEventGenerator = eventGenerator;
this.completionEventTimeoutMs = timeoutMs;
}
@@ -186,8 +199,8 @@
}
@Override
- public String completionEventHint() {
- return completionEventHint;
+ public Set<String> completionEventHints() {
+ return completionEventHintSet;
}
@Override
diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java
index 9aa870d..2ee2672 100644
--- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java
+++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/EventTimeoutTask.java
@@ -17,7 +17,9 @@
import com.google.common.base.MoreObjects;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
import static org.onosproject.workflow.api.CheckCondition.check;
@@ -32,9 +34,9 @@
private final String eventType;
/**
- * Event hint value for finding target event.
+ * Set of Event hint value for finding target event.
*/
- private final String eventHint;
+ private final Set<String> eventHintSet = new HashSet<>();
/**
* Constructor of EventTimeoutTask.
@@ -43,7 +45,7 @@
private EventTimeoutTask(Builder builder) {
super(builder);
this.eventType = builder.eventType;
- this.eventHint = builder.eventHint;
+ this.eventHintSet.addAll(builder.eventHintSet);
}
/**
@@ -55,11 +57,11 @@
}
/**
- * Gets event hint value for finding target event.
- * @return event hint string
+ * Gets set of event hint value for finding target event.
+ * @return event hint set
*/
- public String eventHint() {
- return eventHint;
+ public Set<String> eventHintSet() {
+ return eventHintSet;
}
@Override
@@ -76,7 +78,7 @@
return false;
}
return Objects.equals(this.eventType(), ((EventTask) obj).eventType())
- && Objects.equals(this.eventHint(), ((EventTask) obj).eventHint());
+ && Objects.equals(this.eventHintSet(), ((EventTask) obj).eventHint());
}
@Override
@@ -85,7 +87,7 @@
.add("context", context())
.add("programCounter", programCounter())
.add("eventType", eventType())
- .add("eventHint", eventHint())
+ .add("eventHint", eventHintSet())
.toString();
}
@@ -107,9 +109,9 @@
private String eventType;
/**
- * Event hint value for finding target event.
+ * Set of Event hint value for finding target event.
*/
- private String eventHint;
+ private Set<String> eventHintSet;
/**
* Sets Event type (Class name of event).
@@ -123,11 +125,11 @@
/**
* Sets event hint string for finding target event.
- * @param eventHint event hint string
+ * @param eventHintSet Set of event hint string
* @return builder of EventTimeoutTask
*/
- public Builder eventHint(String eventHint) {
- this.eventHint = eventHint;
+ public Builder eventHintSet(Set<String> eventHintSet) {
+ this.eventHintSet = eventHintSet;
return this;
}
@@ -150,7 +152,7 @@
*/
public EventTimeoutTask build() throws WorkflowException {
check(eventType != null, "eventType is invalid");
- check(eventHint != null, "eventType is invalid");
+ check(eventHintSet != null, "eventHintSet is invalid");
return new EventTimeoutTask(this);
}
}
diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java
index 9f0b4fe..836524d 100644
--- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java
+++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowContext.java
@@ -18,6 +18,7 @@
import org.onosproject.event.Event;
import java.net.URI;
+import java.util.Set;
/**
* An abstract class representing WorkflowContext.
@@ -98,6 +99,20 @@
public abstract void waitCompletion(Class<? extends Event> eventType, String eventHint,
WorkExecutor eventGenerator, long timeoutMs);
+
+ /**
+ * Waits an event which has any one of eventHint from Set 'eventHintSet' after executing executor.
+ * If the event happens, Worklet.isCompleted will be called.
+ * If the event does not happen for timeoutMs, Worklet.timeout will be called.
+ * @param eventType the class of event to wait
+ * @param eventHintSet the Set of eventHints of the event to wait
+ * @param eventGenerator a method reference to be executed after executing executor
+ * @param timeoutMs timeout millisecond
+ */
+ public abstract void waitAnyCompletion(Class<? extends Event> eventType, Set<String> eventHintSet,
+ WorkExecutor eventGenerator, long timeoutMs);
+
+
/**
* Waits timeout milliseconds. After timeoutMs Worklet.timeout will be called.
* @param timeoutMs timeout millisecond
@@ -111,10 +126,10 @@
public abstract Class<? extends Event> completionEventType();
/**
- * Returns the event hint string to wait.
- * @return the event hint string
+ * Returns the set of event hint string to wait.
+ * @return the event hint string set
*/
- public abstract String completionEventHint();
+ public abstract Set<String> completionEventHints();
/**
* Returns method reference for generating completion event.
diff --git a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java
index 9f609b3..c5e00a7 100644
--- a/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java
+++ b/apps/workflow/api/src/main/java/org/onosproject/workflow/api/WorkflowExecutionService.java
@@ -18,6 +18,8 @@
import org.onosproject.event.Event;
import org.onosproject.event.ListenerService;
+import java.util.Set;
+
/**
* Interface for workflow execution service.
*/
@@ -45,11 +47,11 @@
/**
* Registers workflow event map.
* @param eventType event type (class name of event)
- * @param eventHint event hint value
+ * @param eventHintSet Set of event hint value
* @param contextName workflow context name to be called by this event map
* @param programCounterString worklet type to be called by this event map
* @throws WorkflowException workflow exception
*/
- void registerEventMap(Class<? extends Event> eventType, String eventHint,
+ void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
String contextName, String programCounterString) throws WorkflowException;
}
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java
index 1b725de..8cd352e 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/DistributedContextEventMapTreeStore.java
@@ -19,6 +19,8 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -43,6 +45,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -63,6 +66,9 @@
private AsyncDocumentTree<String> eventMapTree;
+ private EventuallyConsistentMap<String, Set<String>> hintSetPerCxtMap;
+
+
@Activate
public void activate() {
@@ -78,36 +84,66 @@
.withName("context-event-map-store")
.withOrdering(Ordering.INSERTION)
.buildDocumentTree();
+
+ hintSetPerCxtMap = storageService.<String, Set<String>>eventuallyConsistentMapBuilder()
+ .withName("workflow-event-hint-per-cxt")
+ .withSerializer(eventMapNamespace)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+
log.info("Started");
}
@Deactivate
public void deactivate() {
eventMapTree.destroy();
+ hintSetPerCxtMap.destroy();
log.info("Stopped");
}
@Override
- public void registerEventMap(String eventType, String eventHint,
+ public void registerEventMap(String eventType, Set<String> eventHintSet,
String contextName, String programCounterString) throws WorkflowException {
- DocumentPath dpath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
- String currentWorkletType = completeVersioned(eventMapTree.get(dpath));
- if (currentWorkletType == null) {
- complete(eventMapTree.createRecursive(dpath, programCounterString));
- } else {
- complete(eventMapTree.replace(dpath, programCounterString, currentWorkletType));
+ for (String eventHint : eventHintSet) {
+ //Insert in eventCxtPerHintMap
+ DocumentPath dpathForCxt = DocumentPath.from(Lists.newArrayList(
+ "root", eventType, eventHint, contextName));
+ String currentWorkletType = completeVersioned(eventMapTree.get(dpathForCxt));
+ if (currentWorkletType == null) {
+ complete(eventMapTree.createRecursive(dpathForCxt, programCounterString));
+ } else {
+ complete(eventMapTree.replace(dpathForCxt, programCounterString, currentWorkletType));
+ }
+ log.trace("RegisterEventMap for eventType:{}, eventSet:{}, contextName:{}, pc:{}",
+ eventType, eventHintSet, contextName, programCounterString);
+
}
+ hintSetPerCxtMap.put(contextName, eventHintSet);
+ log.trace("RegisterEventMap in hintSetPerCxt for " +
+ "eventType:{}, eventSet:{}, contextName:{}, pc:{}",
+ eventType, eventHintSet, contextName, programCounterString);
}
@Override
- public void unregisterEventMap(String eventType, String eventHint, String contextName) throws WorkflowException {
- DocumentPath contextPath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
- complete(eventMapTree.removeNode(contextPath));
+ public void unregisterEventMap(String eventType, String contextName)
+ throws WorkflowException {
+
+ Set<String> hints = hintSetPerCxtMap.get(contextName);
+ for (String eventHint : hints) {
+ //Remove from eventCxtPerHintMap
+ complete(eventMapTree.removeNode(DocumentPath.from(Lists.newArrayList(
+ "root", eventType, eventHint, contextName))));
+ log.trace("UnregisterEventMap from eventCxtPerHintMap for eventType:{}, eventSet:{}, contextName:{}",
+ eventType, eventHint, contextName);
+ }
+ hintSetPerCxtMap.remove(contextName);
}
+
@Override
- public Map<String, String> getEventMap(String eventType, String eventHint) throws WorkflowException {
- DocumentPath path = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint));
+ public Map<String, String> getEventMapByHint(String eventType, String eventHint) throws WorkflowException {
+ DocumentPath path = DocumentPath.from(
+ Lists.newArrayList("root", eventType, eventHint));
Map<String, Versioned<String>> contexts = complete(eventMapTree.getChildren(path));
Map<String, String> eventMap = Maps.newHashMap();
if (Objects.isNull(contexts)) {
@@ -117,10 +153,25 @@
for (Map.Entry<String, Versioned<String>> entry : contexts.entrySet()) {
eventMap.put(entry.getKey(), entry.getValue().value());
}
+ log.trace("getEventMapByHint returns eventMap {} ", eventMap);
return eventMap;
}
@Override
+ public boolean isEventMapPresent(String contextName) {
+ Map<String, String> eventMap = Maps.newHashMap();
+ Set<String> eventHintSet = hintSetPerCxtMap.get(contextName);
+ if (Objects.nonNull(eventHintSet)) {
+ log.trace("EventMap present for Context:{}", contextName);
+ return true;
+ } else {
+ log.trace("EventMap Doesnt exist for Context:{}", contextName);
+ return false;
+ }
+ }
+
+
+ @Override
public Map<String, Versioned<String>> getChildren(String path) throws WorkflowException {
DocumentPath dpath = DocumentPath.from(path);
Map<String, Versioned<String>> entries = complete(eventMapTree.getChildren(dpath));
diff --git a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
index 237f70e..b472c5f 100644
--- a/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
+++ b/apps/workflow/app/src/main/java/org/onosproject/workflow/impl/WorkFlowEngine.java
@@ -16,7 +16,6 @@
package org.onosproject.workflow.impl;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.google.common.collect.Lists;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
@@ -64,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -242,7 +242,7 @@
}
try {
- eventMap = eventMapStore.getEventMap(event.getClass().getName(), eventHint);
+ eventMap = eventMapStore.getEventMapByHint(event.getClass().getName(), eventHint);
} catch (WorkflowException e) {
log.error("Exception: ", e);
return;
@@ -290,22 +290,25 @@
}
@Override
- public void registerEventMap(Class<? extends Event> eventType, String eventHint,
+ public void registerEventMap(Class<? extends Event> eventType, Set<String> eventHintSet,
String contextName, String programCounterString) throws WorkflowException {
- eventMapStore.registerEventMap(eventType.getName(), eventHint, contextName, programCounterString);
- for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
- Map<String, String> eventMap = eventMapStore.getEventMap(eventType.getName(), eventHint);
- if (eventMap != null && eventMap.containsKey(contextName)) {
- return;
- }
- try {
- log.info("sleep {}", i);
- Thread.sleep(10L * (i + 1));
- } catch (InterruptedException e) {
- log.error("Exception: ", e);
- Thread.currentThread().interrupt();
+ eventMapStore.registerEventMap(eventType.getName(), eventHintSet, contextName, programCounterString);
+ for (String eventHint : eventHintSet) {
+ for (int i = 0; i < MAX_REGISTER_EVENTMAP_WAITS; i++) {
+ Map<String, String> eventMap = eventMapStore.getEventMapByHint(eventType.getName(), eventHint);
+ if (eventMap != null && eventMap.containsKey(contextName)) {
+ break;
+ }
+ try {
+ log.info("sleep {}", i);
+ Thread.sleep(10L * (i + 1));
+ } catch (InterruptedException e) {
+ log.error("Exception: ", e);
+ Thread.currentThread().interrupt();
+ }
}
}
+
}
@Override
@@ -442,19 +445,8 @@
*/
private EventTask execEventTask(EventTask task) {
- Map<String, String> eventMap = null;
- try {
- eventMap = eventMapStore.getEventMap(task.event().getClass().getName(), task.eventHint());
- } catch (WorkflowException e) {
- log.error("Exception: {}, trace: {}", e, Lists.newArrayList(e.getStackTrace()));
- return task;
- }
-
- if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
- return task;
- }
-
- if (Objects.isNull(eventMap.get(task.context().name()))) {
+ if (!eventMapStore.isEventMapPresent(task.context().name())) {
+ log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
return task;
}
@@ -501,10 +493,10 @@
log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
eventMapStore.unregisterEventMap(
- task.eventType(), task.eventHint(), latestContext.name());
+ task.eventType(), latestContext.name());
//completed case
- // increase program counter
+ //increase program counter
ProgramCounter pc = latestContext.current();
latestContext.setCurrent(workflow.increased(pc));
@@ -543,20 +535,8 @@
*/
private HandlerTask execEventTimeoutTask(EventTimeoutTask task) {
- Map<String, String> eventMap = null;
- try {
- eventMap = eventMapStore.getEventMap(task.eventType(), task.eventHint());
- } catch (WorkflowException e) {
- log.error("execEventTimeoutTask: Exception: {}, trace: {}",
- e, Lists.newArrayList(e.getStackTrace()));
- return task;
- }
-
- if (Objects.isNull(eventMap) || eventMap.isEmpty()) {
- return task;
- }
-
- if (Objects.isNull(eventMap.get(task.context().name()))) {
+ if (!eventMapStore.isEventMapPresent(task.context().name())) {
+ log.trace("EventMap doesnt exist for taskcontext:{}", task.context().name());
return task;
}
@@ -590,8 +570,7 @@
initWorkletExecution(latestContext);
- eventMapStore.unregisterEventMap(
- task.eventType(), task.eventHint(), latestContext.name());
+ eventMapStore.unregisterEventMap(task.eventType(), latestContext.name());
log.info("{} worklet.timeout(for event):{}", latestContext.name(), worklet.tag());
log.trace("{} task:{}, context: {}", latestContext.name(), task, latestContext);
@@ -796,7 +775,7 @@
throw new WorkflowException(msg);
}
- registerEventMap(latestContext.completionEventType(), latestContext.completionEventHint(),
+ registerEventMap(latestContext.completionEventType(), latestContext.completionEventHints(),
latestContext.name(), pc.toString());
latestContext.completionEventGenerator().apply();
@@ -806,7 +785,7 @@
.context(latestContext)
.programCounter(pc)
.eventType(latestContext.completionEventType().getName())
- .eventHint(latestContext.completionEventHint())
+ .eventHintSet(latestContext.completionEventHints())
.build();
timerChain.schedule(latestContext.completionEventTimeout(),
() -> {