blob: 8cd352e1c37bed21fa36dfc55e4384f4fe15ab04 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
nitinanandf3f94c62019-02-08 10:36:39 +053022import org.onosproject.store.service.EventuallyConsistentMap;
23import org.onosproject.store.service.WallClockTimestamp;
Ray Milkeydf521292018-10-04 15:13:33 -070024import org.osgi.service.component.annotations.Activate;
25import org.osgi.service.component.annotations.Component;
26import org.osgi.service.component.annotations.Deactivate;
27import org.osgi.service.component.annotations.Reference;
28import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090029import org.onlab.util.KryoNamespace;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.AsyncDocumentTree;
34import org.onosproject.store.service.DocumentPath;
35import org.onosproject.store.service.IllegalDocumentModificationException;
36import org.onosproject.store.service.NoSuchDocumentPathException;
37import org.onosproject.store.service.Ordering;
38import org.onosproject.store.service.Serializer;
39import org.onosproject.store.service.StorageService;
40import org.onosproject.store.service.Versioned;
41import org.onosproject.workflow.api.ContextEventMapStore;
42import org.onosproject.workflow.api.WorkflowException;
43import org.slf4j.Logger;
44
45import java.util.Map;
46import java.util.Objects;
47import java.util.Optional;
nitinanandf3f94c62019-02-08 10:36:39 +053048import java.util.Set;
jaegonkim6a7b5242018-09-12 23:09:42 +090049import java.util.concurrent.CompletableFuture;
50import java.util.concurrent.ExecutionException;
51
52import static org.slf4j.LoggerFactory.getLogger;
53
Ray Milkeydf521292018-10-04 15:13:33 -070054@Component(immediate = true, service = ContextEventMapStore.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090055public class DistributedContextEventMapTreeStore implements ContextEventMapStore {
56
57 protected static final Logger log = getLogger(DistributedContextEventMapTreeStore.class);
58
Ray Milkeydf521292018-10-04 15:13:33 -070059 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090060 private CoreService coreService;
61
Ray Milkeydf521292018-10-04 15:13:33 -070062 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090063 private StorageService storageService;
64
65 private ApplicationId appId;
66
67 private AsyncDocumentTree<String> eventMapTree;
68
nitinanandf3f94c62019-02-08 10:36:39 +053069 private EventuallyConsistentMap<String, Set<String>> hintSetPerCxtMap;
70
71
jaegonkim6a7b5242018-09-12 23:09:42 +090072 @Activate
73 public void activate() {
74
75 appId = coreService.registerApplication("org.onosproject.contexteventmapstore");
76 log.info("appId=" + appId);
77
78 KryoNamespace eventMapNamespace = KryoNamespace.newBuilder()
79 .register(KryoNamespaces.API)
80 .build();
81
82 eventMapTree = storageService.<String>documentTreeBuilder()
83 .withSerializer(Serializer.using(eventMapNamespace))
84 .withName("context-event-map-store")
85 .withOrdering(Ordering.INSERTION)
86 .buildDocumentTree();
nitinanandf3f94c62019-02-08 10:36:39 +053087
88 hintSetPerCxtMap = storageService.<String, Set<String>>eventuallyConsistentMapBuilder()
89 .withName("workflow-event-hint-per-cxt")
90 .withSerializer(eventMapNamespace)
91 .withTimestampProvider((k, v) -> new WallClockTimestamp())
92 .build();
93
jaegonkim6a7b5242018-09-12 23:09:42 +090094 log.info("Started");
95 }
96
97 @Deactivate
98 public void deactivate() {
99 eventMapTree.destroy();
nitinanandf3f94c62019-02-08 10:36:39 +0530100 hintSetPerCxtMap.destroy();
jaegonkim6a7b5242018-09-12 23:09:42 +0900101 log.info("Stopped");
102 }
103
104 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530105 public void registerEventMap(String eventType, Set<String> eventHintSet,
jaegonkime0f45b52018-10-09 20:23:26 +0900106 String contextName, String programCounterString) throws WorkflowException {
nitinanandf3f94c62019-02-08 10:36:39 +0530107 for (String eventHint : eventHintSet) {
108 //Insert in eventCxtPerHintMap
109 DocumentPath dpathForCxt = DocumentPath.from(Lists.newArrayList(
110 "root", eventType, eventHint, contextName));
111 String currentWorkletType = completeVersioned(eventMapTree.get(dpathForCxt));
112 if (currentWorkletType == null) {
113 complete(eventMapTree.createRecursive(dpathForCxt, programCounterString));
114 } else {
115 complete(eventMapTree.replace(dpathForCxt, programCounterString, currentWorkletType));
116 }
117 log.trace("RegisterEventMap for eventType:{}, eventSet:{}, contextName:{}, pc:{}",
118 eventType, eventHintSet, contextName, programCounterString);
119
jaegonkim6a7b5242018-09-12 23:09:42 +0900120 }
nitinanandf3f94c62019-02-08 10:36:39 +0530121 hintSetPerCxtMap.put(contextName, eventHintSet);
122 log.trace("RegisterEventMap in hintSetPerCxt for " +
123 "eventType:{}, eventSet:{}, contextName:{}, pc:{}",
124 eventType, eventHintSet, contextName, programCounterString);
jaegonkim6a7b5242018-09-12 23:09:42 +0900125 }
126
127 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530128 public void unregisterEventMap(String eventType, String contextName)
129 throws WorkflowException {
130
131 Set<String> hints = hintSetPerCxtMap.get(contextName);
132 for (String eventHint : hints) {
133 //Remove from eventCxtPerHintMap
134 complete(eventMapTree.removeNode(DocumentPath.from(Lists.newArrayList(
135 "root", eventType, eventHint, contextName))));
136 log.trace("UnregisterEventMap from eventCxtPerHintMap for eventType:{}, eventSet:{}, contextName:{}",
137 eventType, eventHint, contextName);
138 }
139 hintSetPerCxtMap.remove(contextName);
jaegonkim6a7b5242018-09-12 23:09:42 +0900140 }
141
nitinanandf3f94c62019-02-08 10:36:39 +0530142
jaegonkim6a7b5242018-09-12 23:09:42 +0900143 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530144 public Map<String, String> getEventMapByHint(String eventType, String eventHint) throws WorkflowException {
145 DocumentPath path = DocumentPath.from(
146 Lists.newArrayList("root", eventType, eventHint));
jaegonkim6a7b5242018-09-12 23:09:42 +0900147 Map<String, Versioned<String>> contexts = complete(eventMapTree.getChildren(path));
148 Map<String, String> eventMap = Maps.newHashMap();
149 if (Objects.isNull(contexts)) {
150 return eventMap;
151 }
152
153 for (Map.Entry<String, Versioned<String>> entry : contexts.entrySet()) {
154 eventMap.put(entry.getKey(), entry.getValue().value());
155 }
nitinanandf3f94c62019-02-08 10:36:39 +0530156 log.trace("getEventMapByHint returns eventMap {} ", eventMap);
jaegonkim6a7b5242018-09-12 23:09:42 +0900157 return eventMap;
158 }
159
160 @Override
nitinanandf3f94c62019-02-08 10:36:39 +0530161 public boolean isEventMapPresent(String contextName) {
162 Map<String, String> eventMap = Maps.newHashMap();
163 Set<String> eventHintSet = hintSetPerCxtMap.get(contextName);
164 if (Objects.nonNull(eventHintSet)) {
165 log.trace("EventMap present for Context:{}", contextName);
166 return true;
167 } else {
168 log.trace("EventMap Doesnt exist for Context:{}", contextName);
169 return false;
170 }
171 }
172
173
174 @Override
jaegonkim6a7b5242018-09-12 23:09:42 +0900175 public Map<String, Versioned<String>> getChildren(String path) throws WorkflowException {
176 DocumentPath dpath = DocumentPath.from(path);
177 Map<String, Versioned<String>> entries = complete(eventMapTree.getChildren(dpath));
178 return entries;
179 }
180
181 @Override
182 public DocumentPath getDocumentPath(String path) throws WorkflowException {
183 DocumentPath dpath = DocumentPath.from(path);
184 return dpath;
185 }
186
187 @Override
188 public ObjectNode asJsonTree() throws WorkflowException {
189
190 DocumentPath rootPath = DocumentPath.from(Lists.newArrayList("root"));
191 Map<String, Versioned<String>> eventmap = complete(eventMapTree.getChildren(rootPath));
192
193 ObjectNode rootNode = JsonNodeFactory.instance.objectNode();
194
195 for (Map.Entry<String, Versioned<String>> eventTypeEntry : eventmap.entrySet()) {
196
197 String eventType = eventTypeEntry.getKey();
198
199 ObjectNode eventTypeNode = JsonNodeFactory.instance.objectNode();
200 rootNode.put(eventType, eventTypeNode);
201
202 DocumentPath eventTypePath = DocumentPath.from(Lists.newArrayList("root", eventType));
203 Map<String, Versioned<String>> hintmap = complete(eventMapTree.getChildren(eventTypePath));
204
205 for (Map.Entry<String, Versioned<String>> hintEntry : hintmap.entrySet()) {
206
207 String hint = hintEntry.getKey();
208
209 ObjectNode hintNode = JsonNodeFactory.instance.objectNode();
210 eventTypeNode.put(hint, hintNode);
211
212 DocumentPath hintPath = DocumentPath.from(Lists.newArrayList("root", eventType, hint));
213 Map<String, Versioned<String>> contextmap = complete(eventMapTree.getChildren(hintPath));
214
215 for (Map.Entry<String, Versioned<String>> ctxtEntry : contextmap.entrySet()) {
216 hintNode.put(ctxtEntry.getKey(), ctxtEntry.getValue().value());
217 }
218 }
219 }
220
221 return rootNode;
222 }
223
224 private <T> T complete(CompletableFuture<T> future) throws WorkflowException {
225 try {
226 return future.get();
227 } catch (InterruptedException e) {
228 Thread.currentThread().interrupt();
229 throw new WorkflowException(e.getCause().getMessage());
230 } catch (ExecutionException e) {
231 if (e.getCause() instanceof IllegalDocumentModificationException) {
232 throw new WorkflowException("Node or parent does not exist or is root or is not a Leaf Node",
233 e.getCause());
234 } else if (e.getCause() instanceof NoSuchDocumentPathException) {
235 return null;
236 } else {
237 throw new WorkflowException("Datastore operation failed", e.getCause());
238 }
239 }
240 }
241
242 private <T> T completeVersioned(CompletableFuture<Versioned<T>> future) throws WorkflowException {
243 return Optional.ofNullable(complete(future))
244 .map(Versioned::value)
245 .orElse(null);
246 }
247}