blob: 31fe0446b8ce19972dc004a6dd119357fb6ae810 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.ArrayNode;
20import com.fasterxml.jackson.databind.node.BaseJsonNode;
21import com.fasterxml.jackson.databind.node.BigIntegerNode;
22import com.fasterxml.jackson.databind.node.BinaryNode;
23import com.fasterxml.jackson.databind.node.BooleanNode;
24import com.fasterxml.jackson.databind.node.ContainerNode;
25import com.fasterxml.jackson.databind.node.DecimalNode;
26import com.fasterxml.jackson.databind.node.DoubleNode;
27import com.fasterxml.jackson.databind.node.FloatNode;
28import com.fasterxml.jackson.databind.node.IntNode;
29import com.fasterxml.jackson.databind.node.JsonNodeCreator;
30import com.fasterxml.jackson.databind.node.JsonNodeFactory;
31import com.fasterxml.jackson.databind.node.JsonNodeType;
32import com.fasterxml.jackson.databind.node.LongNode;
33import com.fasterxml.jackson.databind.node.MissingNode;
34import com.fasterxml.jackson.databind.node.NullNode;
35import com.fasterxml.jackson.databind.node.NumericNode;
36import com.fasterxml.jackson.databind.node.ObjectNode;
37import com.fasterxml.jackson.databind.node.POJONode;
38import com.fasterxml.jackson.databind.node.ShortNode;
39import com.fasterxml.jackson.databind.node.TextNode;
40import com.fasterxml.jackson.databind.node.ValueNode;
41import com.google.common.collect.ImmutableList;
42import com.google.common.collect.Maps;
Ray Milkeydf521292018-10-04 15:13:33 -070043import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090048import org.onlab.util.KryoNamespace;
49import org.onosproject.core.ApplicationId;
50import org.onosproject.core.CoreService;
51import org.onosproject.workflow.api.DataModelTree;
52import org.onosproject.workflow.api.DefaultWorkplace;
53import org.onosproject.workflow.api.DefaultWorkflowContext;
54import org.onosproject.workflow.api.JsonDataModelTree;
55import org.onosproject.workflow.api.SystemWorkflowContext;
56import org.onosproject.workflow.api.WorkflowContext;
57import org.onosproject.workflow.api.WorkflowData;
58import org.onosproject.workflow.api.WorkflowState;
59import org.onosproject.workflow.api.Workplace;
60import org.onosproject.workflow.api.WorkflowDataEvent;
61import org.onosproject.workflow.api.WorkplaceStore;
62import org.onosproject.workflow.api.WorkplaceStoreDelegate;
63import org.onosproject.store.AbstractStore;
64import org.onosproject.store.serializers.KryoNamespaces;
65import org.onosproject.store.service.ConsistentMap;
66import org.onosproject.store.service.MapEvent;
67import org.onosproject.store.service.MapEventListener;
68import org.onosproject.store.service.Serializer;
69import org.onosproject.store.service.StorageException;
70import org.onosproject.store.service.StorageService;
71import org.onosproject.store.service.Versioned;
72import org.slf4j.Logger;
73
74import java.util.ArrayList;
75import java.util.Collection;
76import java.util.Collections;
77import java.util.HashMap;
78import java.util.LinkedHashMap;
79import java.util.List;
80import java.util.Map;
81
82import static org.slf4j.LoggerFactory.getLogger;
83
Ray Milkeydf521292018-10-04 15:13:33 -070084@Component(immediate = true, service = WorkplaceStore.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090085public class DistributedWorkplaceStore
86 extends AbstractStore<WorkflowDataEvent, WorkplaceStoreDelegate> implements WorkplaceStore {
87
Ray Milkeydf521292018-10-04 15:13:33 -070088 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090089 protected CoreService coreService;
90
Ray Milkeydf521292018-10-04 15:13:33 -070091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090092 protected StorageService storageService;
93
94 private ApplicationId appId;
95 private final Logger log = getLogger(getClass());
96
97 private final WorkplaceMapListener workplaceMapEventListener = new WorkplaceMapListener();
98 private ConsistentMap<String, WorkflowData> workplaceMap;
99 private Map<String, Workplace> localWorkplaceMap = Maps.newConcurrentMap();
100
101 private final WorkflowContextMapListener contextMapEventListener = new WorkflowContextMapListener();
102 private ConsistentMap<String, WorkflowData> contextMap;
103 private Map<String, WorkflowContext> localContextMap = Maps.newConcurrentMap();
104
105 private Map<String, Map<String, WorkflowContext>> localWorkplaceMemberMap = Maps.newConcurrentMap();
106
107 @Activate
108 public void activate() {
109
110 appId = coreService.registerApplication("org.onosproject.workplacestore");
111 log.info("appId=" + appId);
112
113 KryoNamespace workplaceNamespace = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .register(WorkflowData.class)
116 .register(Workplace.class)
117 .register(DefaultWorkplace.class)
118 .register(WorkflowContext.class)
119 .register(DefaultWorkflowContext.class)
120 .register(SystemWorkflowContext.class)
121 .register(WorkflowState.class)
122 .register(DataModelTree.class)
123 .register(JsonDataModelTree.class)
124 .register(List.class)
125 .register(ArrayList.class)
126 .register(JsonNode.class)
127 .register(ObjectNode.class)
128 .register(TextNode.class)
129 .register(LinkedHashMap.class)
130 .register(ArrayNode.class)
131 .register(BaseJsonNode.class)
132 .register(BigIntegerNode.class)
133 .register(BinaryNode.class)
134 .register(BooleanNode.class)
135 .register(ContainerNode.class)
136 .register(DecimalNode.class)
137 .register(DoubleNode.class)
138 .register(FloatNode.class)
139 .register(IntNode.class)
140 .register(JsonNodeType.class)
141 .register(LongNode.class)
142 .register(MissingNode.class)
143 .register(NullNode.class)
144 .register(NumericNode.class)
145 .register(POJONode.class)
146 .register(ShortNode.class)
147 .register(ValueNode.class)
148 .register(JsonNodeCreator.class)
149 .register(JsonNodeFactory.class)
150 .build();
151
152 localWorkplaceMap.clear();
153 workplaceMap = storageService.<String, WorkflowData>consistentMapBuilder()
154 .withSerializer(Serializer.using(workplaceNamespace))
155 .withName("workplace-map")
156 .withApplicationId(appId)
157 .build();
158 workplaceMap.addListener(workplaceMapEventListener);
159
160 localContextMap.clear();
161 contextMap = storageService.<String, WorkflowData>consistentMapBuilder()
162 .withSerializer(Serializer.using(workplaceNamespace))
163 .withName("workflow-context-map")
164 .withApplicationId(appId)
165 .build();
166 contextMap.addListener(contextMapEventListener);
167
168 workplaceMapEventListener.syncLocal();
169 contextMapEventListener.syncLocal();
170 log.info("Started");
171 }
172
173 @Deactivate
174 public void deactivate() {
175 workplaceMap.destroy();
176 localWorkplaceMap.clear();
177 contextMap.destroy();
178 localContextMap.clear();
179
180 log.info("Stopped");
181 }
182
183 @Override
184 public void registerWorkplace(String name, Workplace workplace) throws StorageException {
185 workplaceMap.put(name, workplace);
186 }
187
188 @Override
189 public void removeWorkplace(String name) throws StorageException {
190 removeWorkplaceContexts(name);
191 workplaceMap.remove(name);
192 }
193
194 @Override
195 public Workplace getWorkplace(String name) throws StorageException {
196 return localWorkplaceMap.get(name);
197 }
198
199 @Override
200 public Collection<Workplace> getWorkplaces() throws StorageException {
201 return ImmutableList.copyOf(localWorkplaceMap.values());
202 }
203
204 @Override
205 public void commitWorkplace(String name, Workplace workplace, boolean handleEvent) throws StorageException {
206 workplace.setTriggerNext(handleEvent);
207 if (workplaceMap.containsKey(name)) {
208 workplaceMap.replace(name, workplace);
209 } else {
210 registerWorkplace(name, workplace);
211 }
212 }
213
214 @Override
215 public void registerContext(String name, WorkflowContext context) throws StorageException {
216 contextMap.put(name, context);
217 }
218
219 @Override
220 public void removeContext(String name) throws StorageException {
221 contextMap.remove(name);
222 }
223
224 @Override
225 public WorkflowContext getContext(String name) throws StorageException {
226 return localContextMap.get(name);
227 }
228
229 @Override
230 public void commitContext(String name, WorkflowContext context, boolean handleEvent) throws StorageException {
231 context.setTriggerNext(handleEvent);
232 if (contextMap.containsKey(name)) {
233 contextMap.replace(name, context);
234 } else {
235 registerContext(name, context);
236 }
237 }
238
239 @Override
240 public Collection<WorkflowContext> getContexts() throws StorageException {
241 return ImmutableList.copyOf(localContextMap.values());
242 }
243
244 @Override
245 public Collection<WorkflowContext> getWorkplaceContexts(String workplaceName) {
246 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
247 if (ctxMap == null) {
248 return Collections.emptyList();
249 }
250
251 return ImmutableList.copyOf(ctxMap.values());
252 }
253
254 @Override
255 public void removeWorkplaceContexts(String workplaceName) {
256 for (WorkflowContext ctx : getWorkplaceContexts(workplaceName)) {
257 removeContext(ctx.name());
258 }
259 }
260
261 private class WorkplaceMapListener implements MapEventListener<String, WorkflowData> {
262
263 @Override
264 public void event(MapEvent<String, WorkflowData> event) {
265
266 Workplace newWorkplace = (Workplace) Versioned.valueOrNull(event.newValue());
267 Workplace oldWorkplace = (Workplace) Versioned.valueOrNull(event.oldValue());
268
269 log.info("WorkplaceMap event: {}", event);
270 switch (event.type()) {
271 case INSERT:
272 insert(newWorkplace);
273 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newWorkplace));
274 break;
275 case UPDATE:
276 update(newWorkplace);
277 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newWorkplace));
278 break;
279 case REMOVE:
280 remove(oldWorkplace);
281 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldWorkplace));
282 break;
283 default:
284 }
285 }
286
287 private void insert(Workplace workplace) {
288 localWorkplaceMap.put(workplace.name(), workplace);
289 }
290
291 private void update(Workplace workplace) {
292 localWorkplaceMap.replace(workplace.name(), workplace);
293 }
294
295 private void remove(Workplace workplace) {
296 localWorkplaceMap.remove(workplace.name());
297 }
298
299 public void syncLocal() {
300 workplaceMap.values().stream().forEach(
301 x -> insert((Workplace) (x.value()))
302 );
303 }
304 }
305
306 private class WorkflowContextMapListener implements MapEventListener<String, WorkflowData> {
307
308 @Override
309 public void event(MapEvent<String, WorkflowData> event) {
310
311 WorkflowContext newContext = (WorkflowContext) Versioned.valueOrNull(event.newValue());
312 WorkflowContext oldContext = (WorkflowContext) Versioned.valueOrNull(event.oldValue());
313
314 log.info("WorkflowContext event: {}", event);
315 switch (event.type()) {
316 case INSERT:
317 insert(newContext);
318 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newContext));
319 break;
320 case UPDATE:
321 update(newContext);
322 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newContext));
323 break;
324 case REMOVE:
325 remove(oldContext);
326 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldContext));
327 break;
328 default:
329 }
330 }
331
332 /**
333 * Inserts workflow context on local hash map.
334 * @param context workflow context
335 */
336 private void insert(WorkflowContext context) {
337 String workplaceName = context.workplaceName();
338 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
339 if (ctxMap == null) {
340 ctxMap = new HashMap<>();
341 localWorkplaceMemberMap.put(workplaceName, ctxMap);
342 }
343 ctxMap.put(context.name(), context);
344
345 localContextMap.put(context.name(), context);
346 }
347
348 /**
349 * Updates workflow context on local hash map.
350 * @param context workflow context
351 */
352 private void update(WorkflowContext context) {
353 String workplaceName = context.workplaceName();
354 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
355 if (ctxMap == null) {
356 ctxMap = new HashMap<>();
357 localWorkplaceMemberMap.put(workplaceName, ctxMap);
358 }
359 ctxMap.put(context.name(), context);
360
361 localContextMap.put(context.name(), context);
362 }
363
364 /**
365 * Removes workflow context from local hash map.
366 * @param context workflow context
367 */
368 private void remove(WorkflowContext context) {
369 localContextMap.remove(context.name());
370
371 String workplaceName = context.workplaceName();
372 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
373 if (ctxMap == null) {
374 log.error("remove-context: Failed to find workplace({}) in localWorkplaceMemberMap", workplaceName);
375 return;
376 }
377 ctxMap.remove(context.name());
378 if (ctxMap.size() == 0) {
379 localWorkplaceMemberMap.remove(workplaceName, ctxMap);
380 }
381 }
382
383 /**
384 * Synchronizes local hash map.
385 */
386 public void syncLocal() {
387 contextMap.values().stream().forEach(
388 x -> insert((WorkflowContext) (x.value()))
389 );
390 }
391 }
392}