blob: b7c736678df69edaf7e60549f15d9c179022479a [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.node.JsonNodeFactory;
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.apache.felix.scr.annotations.Service;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.AsyncDocumentTree;
33import org.onosproject.store.service.DocumentPath;
34import org.onosproject.store.service.IllegalDocumentModificationException;
35import org.onosproject.store.service.NoSuchDocumentPathException;
36import org.onosproject.store.service.Ordering;
37import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
39import org.onosproject.store.service.Versioned;
40import org.onosproject.workflow.api.ContextEventMapStore;
41import org.onosproject.workflow.api.WorkflowException;
42import org.slf4j.Logger;
43
44import java.util.Map;
45import java.util.Objects;
46import java.util.Optional;
47import java.util.concurrent.CompletableFuture;
48import java.util.concurrent.ExecutionException;
49
50import static org.slf4j.LoggerFactory.getLogger;
51
52@Component(immediate = true)
53@Service
54public class DistributedContextEventMapTreeStore implements ContextEventMapStore {
55
56 protected static final Logger log = getLogger(DistributedContextEventMapTreeStore.class);
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private CoreService coreService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 private StorageService storageService;
63
64 private ApplicationId appId;
65
66 private AsyncDocumentTree<String> eventMapTree;
67
68 @Activate
69 public void activate() {
70
71 appId = coreService.registerApplication("org.onosproject.contexteventmapstore");
72 log.info("appId=" + appId);
73
74 KryoNamespace eventMapNamespace = KryoNamespace.newBuilder()
75 .register(KryoNamespaces.API)
76 .build();
77
78 eventMapTree = storageService.<String>documentTreeBuilder()
79 .withSerializer(Serializer.using(eventMapNamespace))
80 .withName("context-event-map-store")
81 .withOrdering(Ordering.INSERTION)
82 .buildDocumentTree();
83 log.info("Started");
84 }
85
86 @Deactivate
87 public void deactivate() {
88 eventMapTree.destroy();
89 log.info("Stopped");
90 }
91
92 @Override
93 public void registerEventMap(String eventType, String eventHint,
94 String contextName, String workletType) throws WorkflowException {
95 DocumentPath dpath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
96 String currentWorkletType = completeVersioned(eventMapTree.get(dpath));
97 if (currentWorkletType == null) {
98 complete(eventMapTree.createRecursive(dpath, workletType));
99 } else {
100 complete(eventMapTree.replace(dpath, workletType, currentWorkletType));
101 }
102 }
103
104 @Override
105 public void unregisterEventMap(String eventType, String eventHint, String contextName) throws WorkflowException {
106 DocumentPath contextPath = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint, contextName));
107 complete(eventMapTree.removeNode(contextPath));
108 }
109
110 @Override
111 public Map<String, String> getEventMap(String eventType, String eventHint) throws WorkflowException {
112 DocumentPath path = DocumentPath.from(Lists.newArrayList("root", eventType, eventHint));
113 Map<String, Versioned<String>> contexts = complete(eventMapTree.getChildren(path));
114 Map<String, String> eventMap = Maps.newHashMap();
115 if (Objects.isNull(contexts)) {
116 return eventMap;
117 }
118
119 for (Map.Entry<String, Versioned<String>> entry : contexts.entrySet()) {
120 eventMap.put(entry.getKey(), entry.getValue().value());
121 }
122 return eventMap;
123 }
124
125 @Override
126 public Map<String, Versioned<String>> getChildren(String path) throws WorkflowException {
127 DocumentPath dpath = DocumentPath.from(path);
128 Map<String, Versioned<String>> entries = complete(eventMapTree.getChildren(dpath));
129 return entries;
130 }
131
132 @Override
133 public DocumentPath getDocumentPath(String path) throws WorkflowException {
134 DocumentPath dpath = DocumentPath.from(path);
135 return dpath;
136 }
137
138 @Override
139 public ObjectNode asJsonTree() throws WorkflowException {
140
141 DocumentPath rootPath = DocumentPath.from(Lists.newArrayList("root"));
142 Map<String, Versioned<String>> eventmap = complete(eventMapTree.getChildren(rootPath));
143
144 ObjectNode rootNode = JsonNodeFactory.instance.objectNode();
145
146 for (Map.Entry<String, Versioned<String>> eventTypeEntry : eventmap.entrySet()) {
147
148 String eventType = eventTypeEntry.getKey();
149
150 ObjectNode eventTypeNode = JsonNodeFactory.instance.objectNode();
151 rootNode.put(eventType, eventTypeNode);
152
153 DocumentPath eventTypePath = DocumentPath.from(Lists.newArrayList("root", eventType));
154 Map<String, Versioned<String>> hintmap = complete(eventMapTree.getChildren(eventTypePath));
155
156 for (Map.Entry<String, Versioned<String>> hintEntry : hintmap.entrySet()) {
157
158 String hint = hintEntry.getKey();
159
160 ObjectNode hintNode = JsonNodeFactory.instance.objectNode();
161 eventTypeNode.put(hint, hintNode);
162
163 DocumentPath hintPath = DocumentPath.from(Lists.newArrayList("root", eventType, hint));
164 Map<String, Versioned<String>> contextmap = complete(eventMapTree.getChildren(hintPath));
165
166 for (Map.Entry<String, Versioned<String>> ctxtEntry : contextmap.entrySet()) {
167 hintNode.put(ctxtEntry.getKey(), ctxtEntry.getValue().value());
168 }
169 }
170 }
171
172 return rootNode;
173 }
174
175 private <T> T complete(CompletableFuture<T> future) throws WorkflowException {
176 try {
177 return future.get();
178 } catch (InterruptedException e) {
179 Thread.currentThread().interrupt();
180 throw new WorkflowException(e.getCause().getMessage());
181 } catch (ExecutionException e) {
182 if (e.getCause() instanceof IllegalDocumentModificationException) {
183 throw new WorkflowException("Node or parent does not exist or is root or is not a Leaf Node",
184 e.getCause());
185 } else if (e.getCause() instanceof NoSuchDocumentPathException) {
186 return null;
187 } else {
188 throw new WorkflowException("Datastore operation failed", e.getCause());
189 }
190 }
191 }
192
193 private <T> T completeVersioned(CompletableFuture<Versioned<T>> future) throws WorkflowException {
194 return Optional.ofNullable(complete(future))
195 .map(Versioned::value)
196 .orElse(null);
197 }
198}