blob: 1ee2cf0bc402381287d037d85057110753a295df [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
Ray Milkeydf521292018-10-04 15:13:33 -070022import org.osgi.service.component.annotations.Activate;
23import org.osgi.service.component.annotations.Component;
24import org.osgi.service.component.annotations.Deactivate;
25import org.osgi.service.component.annotations.Reference;
26import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090027import org.onlab.util.KryoNamespace;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.AsyncDocumentTree;
32import org.onosproject.store.service.DocumentPath;
33import org.onosproject.store.service.IllegalDocumentModificationException;
34import org.onosproject.store.service.NoSuchDocumentPathException;
35import org.onosproject.store.service.Ordering;
36import org.onosproject.store.service.Serializer;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.Versioned;
39import org.onosproject.workflow.api.ContextEventMapStore;
40import org.onosproject.workflow.api.WorkflowException;
41import org.slf4j.Logger;
42
43import java.util.Map;
44import java.util.Objects;
45import java.util.Optional;
46import java.util.concurrent.CompletableFuture;
47import java.util.concurrent.ExecutionException;
48
49import static org.slf4j.LoggerFactory.getLogger;
50
Ray Milkeydf521292018-10-04 15:13:33 -070051@Component(immediate = true, service = ContextEventMapStore.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090052public class DistributedContextEventMapTreeStore implements ContextEventMapStore {
53
54 protected static final Logger log = getLogger(DistributedContextEventMapTreeStore.class);
55
Ray Milkeydf521292018-10-04 15:13:33 -070056 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090057 private CoreService coreService;
58
Ray Milkeydf521292018-10-04 15:13:33 -070059 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090060 private StorageService storageService;
61
62 private ApplicationId appId;
63
64 private AsyncDocumentTree<String> eventMapTree;
65
66 @Activate
67 public void activate() {
68
69 appId = coreService.registerApplication("org.onosproject.contexteventmapstore");
70 log.info("appId=" + appId);
71
72 KryoNamespace eventMapNamespace = KryoNamespace.newBuilder()
73 .register(KryoNamespaces.API)
74 .build();
75
76 eventMapTree = storageService.<String>documentTreeBuilder()
77 .withSerializer(Serializer.using(eventMapNamespace))
78 .withName("context-event-map-store")
79 .withOrdering(Ordering.INSERTION)
80 .buildDocumentTree();
81 log.info("Started");
82 }
83
84 @Deactivate
85 public void deactivate() {
86 eventMapTree.destroy();
87 log.info("Stopped");
88 }
89
90 @Override
91 public void registerEventMap(String eventType, String eventHint,
92 String contextName, String workletType) throws WorkflowException {
93 DocumentPath dpath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
94 String currentWorkletType = completeVersioned(eventMapTree.get(dpath));
95 if (currentWorkletType == null) {
96 complete(eventMapTree.createRecursive(dpath, workletType));
97 } else {
98 complete(eventMapTree.replace(dpath, workletType, currentWorkletType));
99 }
100 }
101
102 @Override
103 public void unregisterEventMap(String eventType, String eventHint, String contextName) throws WorkflowException {
104 DocumentPath contextPath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
105 complete(eventMapTree.removeNode(contextPath));
106 }
107
108 @Override
109 public Map<String, String> getEventMap(String eventType, String eventHint) throws WorkflowException {
110 DocumentPath path = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint));
111 Map<String, Versioned<String>> contexts = complete(eventMapTree.getChildren(path));
112 Map<String, String> eventMap = Maps.newHashMap();
113 if (Objects.isNull(contexts)) {
114 return eventMap;
115 }
116
117 for (Map.Entry<String, Versioned<String>> entry : contexts.entrySet()) {
118 eventMap.put(entry.getKey(), entry.getValue().value());
119 }
120 return eventMap;
121 }
122
123 @Override
124 public Map<String, Versioned<String>> getChildren(String path) throws WorkflowException {
125 DocumentPath dpath = DocumentPath.from(path);
126 Map<String, Versioned<String>> entries = complete(eventMapTree.getChildren(dpath));
127 return entries;
128 }
129
130 @Override
131 public DocumentPath getDocumentPath(String path) throws WorkflowException {
132 DocumentPath dpath = DocumentPath.from(path);
133 return dpath;
134 }
135
136 @Override
137 public ObjectNode asJsonTree() throws WorkflowException {
138
139 DocumentPath rootPath = DocumentPath.from(Lists.newArrayList("root"));
140 Map<String, Versioned<String>> eventmap = complete(eventMapTree.getChildren(rootPath));
141
142 ObjectNode rootNode = JsonNodeFactory.instance.objectNode();
143
144 for (Map.Entry<String, Versioned<String>> eventTypeEntry : eventmap.entrySet()) {
145
146 String eventType = eventTypeEntry.getKey();
147
148 ObjectNode eventTypeNode = JsonNodeFactory.instance.objectNode();
149 rootNode.put(eventType, eventTypeNode);
150
151 DocumentPath eventTypePath = DocumentPath.from(Lists.newArrayList("root", eventType));
152 Map<String, Versioned<String>> hintmap = complete(eventMapTree.getChildren(eventTypePath));
153
154 for (Map.Entry<String, Versioned<String>> hintEntry : hintmap.entrySet()) {
155
156 String hint = hintEntry.getKey();
157
158 ObjectNode hintNode = JsonNodeFactory.instance.objectNode();
159 eventTypeNode.put(hint, hintNode);
160
161 DocumentPath hintPath = DocumentPath.from(Lists.newArrayList("root", eventType, hint));
162 Map<String, Versioned<String>> contextmap = complete(eventMapTree.getChildren(hintPath));
163
164 for (Map.Entry<String, Versioned<String>> ctxtEntry : contextmap.entrySet()) {
165 hintNode.put(ctxtEntry.getKey(), ctxtEntry.getValue().value());
166 }
167 }
168 }
169
170 return rootNode;
171 }
172
173 private <T> T complete(CompletableFuture<T> future) throws WorkflowException {
174 try {
175 return future.get();
176 } catch (InterruptedException e) {
177 Thread.currentThread().interrupt();
178 throw new WorkflowException(e.getCause().getMessage());
179 } catch (ExecutionException e) {
180 if (e.getCause() instanceof IllegalDocumentModificationException) {
181 throw new WorkflowException("Node or parent does not exist or is root or is not a Leaf Node",
182 e.getCause());
183 } else if (e.getCause() instanceof NoSuchDocumentPathException) {
184 return null;
185 } else {
186 throw new WorkflowException("Datastore operation failed", e.getCause());
187 }
188 }
189 }
190
191 private <T> T completeVersioned(CompletableFuture<Versioned<T>> future) throws WorkflowException {
192 return Optional.ofNullable(complete(future))
193 .map(Versioned::value)
194 .orElse(null);
195 }
196}