blob: 12de3f0170e94d76ae3d71fdde2c2e0ba74d41b9 [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.node.ArrayNode;
20import com.fasterxml.jackson.databind.node.BaseJsonNode;
21import com.fasterxml.jackson.databind.node.BigIntegerNode;
22import com.fasterxml.jackson.databind.node.BinaryNode;
23import com.fasterxml.jackson.databind.node.BooleanNode;
24import com.fasterxml.jackson.databind.node.ContainerNode;
25import com.fasterxml.jackson.databind.node.DecimalNode;
26import com.fasterxml.jackson.databind.node.DoubleNode;
27import com.fasterxml.jackson.databind.node.FloatNode;
28import com.fasterxml.jackson.databind.node.IntNode;
29import com.fasterxml.jackson.databind.node.JsonNodeCreator;
30import com.fasterxml.jackson.databind.node.JsonNodeFactory;
31import com.fasterxml.jackson.databind.node.JsonNodeType;
32import com.fasterxml.jackson.databind.node.LongNode;
33import com.fasterxml.jackson.databind.node.MissingNode;
34import com.fasterxml.jackson.databind.node.NullNode;
35import com.fasterxml.jackson.databind.node.NumericNode;
36import com.fasterxml.jackson.databind.node.ObjectNode;
37import com.fasterxml.jackson.databind.node.POJONode;
38import com.fasterxml.jackson.databind.node.ShortNode;
39import com.fasterxml.jackson.databind.node.TextNode;
40import com.fasterxml.jackson.databind.node.ValueNode;
41import com.google.common.collect.ImmutableList;
42import com.google.common.collect.Maps;
Ray Milkeydf521292018-10-04 15:13:33 -070043import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
jaegonkim6a7b5242018-09-12 23:09:42 +090048import org.onlab.util.KryoNamespace;
49import org.onosproject.core.ApplicationId;
50import org.onosproject.core.CoreService;
51import org.onosproject.workflow.api.DataModelTree;
52import org.onosproject.workflow.api.DefaultWorkplace;
53import org.onosproject.workflow.api.DefaultWorkflowContext;
54import org.onosproject.workflow.api.JsonDataModelTree;
jaegonkime0f45b52018-10-09 20:23:26 +090055import org.onosproject.workflow.api.ProgramCounter;
jaegonkim6a7b5242018-09-12 23:09:42 +090056import org.onosproject.workflow.api.SystemWorkflowContext;
57import org.onosproject.workflow.api.WorkflowContext;
58import org.onosproject.workflow.api.WorkflowData;
59import org.onosproject.workflow.api.WorkflowState;
60import org.onosproject.workflow.api.Workplace;
61import org.onosproject.workflow.api.WorkflowDataEvent;
62import org.onosproject.workflow.api.WorkplaceStore;
63import org.onosproject.workflow.api.WorkplaceStoreDelegate;
64import org.onosproject.store.AbstractStore;
65import org.onosproject.store.serializers.KryoNamespaces;
66import org.onosproject.store.service.ConsistentMap;
67import org.onosproject.store.service.MapEvent;
68import org.onosproject.store.service.MapEventListener;
69import org.onosproject.store.service.Serializer;
70import org.onosproject.store.service.StorageException;
71import org.onosproject.store.service.StorageService;
72import org.onosproject.store.service.Versioned;
73import org.slf4j.Logger;
74
75import java.util.ArrayList;
76import java.util.Collection;
77import java.util.Collections;
78import java.util.HashMap;
79import java.util.LinkedHashMap;
80import java.util.List;
81import java.util.Map;
82
83import static org.slf4j.LoggerFactory.getLogger;
84
Ray Milkeydf521292018-10-04 15:13:33 -070085@Component(immediate = true, service = WorkplaceStore.class)
jaegonkim6a7b5242018-09-12 23:09:42 +090086public class DistributedWorkplaceStore
87 extends AbstractStore<WorkflowDataEvent, WorkplaceStoreDelegate> implements WorkplaceStore {
88
Ray Milkeydf521292018-10-04 15:13:33 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090090 protected CoreService coreService;
91
Ray Milkeydf521292018-10-04 15:13:33 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY)
jaegonkim6a7b5242018-09-12 23:09:42 +090093 protected StorageService storageService;
94
95 private ApplicationId appId;
96 private final Logger log = getLogger(getClass());
97
98 private final WorkplaceMapListener workplaceMapEventListener = new WorkplaceMapListener();
99 private ConsistentMap<String, WorkflowData> workplaceMap;
100 private Map<String, Workplace> localWorkplaceMap = Maps.newConcurrentMap();
101
102 private final WorkflowContextMapListener contextMapEventListener = new WorkflowContextMapListener();
103 private ConsistentMap<String, WorkflowData> contextMap;
104 private Map<String, WorkflowContext> localContextMap = Maps.newConcurrentMap();
105
106 private Map<String, Map<String, WorkflowContext>> localWorkplaceMemberMap = Maps.newConcurrentMap();
107
108 @Activate
109 public void activate() {
110
111 appId = coreService.registerApplication("org.onosproject.workplacestore");
112 log.info("appId=" + appId);
113
114 KryoNamespace workplaceNamespace = KryoNamespace.newBuilder()
115 .register(KryoNamespaces.API)
116 .register(WorkflowData.class)
117 .register(Workplace.class)
118 .register(DefaultWorkplace.class)
119 .register(WorkflowContext.class)
120 .register(DefaultWorkflowContext.class)
121 .register(SystemWorkflowContext.class)
122 .register(WorkflowState.class)
jaegonkime0f45b52018-10-09 20:23:26 +0900123 .register(ProgramCounter.class)
jaegonkim6a7b5242018-09-12 23:09:42 +0900124 .register(DataModelTree.class)
125 .register(JsonDataModelTree.class)
126 .register(List.class)
127 .register(ArrayList.class)
128 .register(JsonNode.class)
129 .register(ObjectNode.class)
130 .register(TextNode.class)
131 .register(LinkedHashMap.class)
132 .register(ArrayNode.class)
133 .register(BaseJsonNode.class)
134 .register(BigIntegerNode.class)
135 .register(BinaryNode.class)
136 .register(BooleanNode.class)
137 .register(ContainerNode.class)
138 .register(DecimalNode.class)
139 .register(DoubleNode.class)
140 .register(FloatNode.class)
141 .register(IntNode.class)
142 .register(JsonNodeType.class)
143 .register(LongNode.class)
144 .register(MissingNode.class)
145 .register(NullNode.class)
146 .register(NumericNode.class)
147 .register(POJONode.class)
148 .register(ShortNode.class)
149 .register(ValueNode.class)
150 .register(JsonNodeCreator.class)
151 .register(JsonNodeFactory.class)
152 .build();
153
154 localWorkplaceMap.clear();
155 workplaceMap = storageService.<String, WorkflowData>consistentMapBuilder()
156 .withSerializer(Serializer.using(workplaceNamespace))
157 .withName("workplace-map")
158 .withApplicationId(appId)
159 .build();
160 workplaceMap.addListener(workplaceMapEventListener);
161
162 localContextMap.clear();
163 contextMap = storageService.<String, WorkflowData>consistentMapBuilder()
164 .withSerializer(Serializer.using(workplaceNamespace))
165 .withName("workflow-context-map")
166 .withApplicationId(appId)
167 .build();
168 contextMap.addListener(contextMapEventListener);
169
170 workplaceMapEventListener.syncLocal();
171 contextMapEventListener.syncLocal();
172 log.info("Started");
173 }
174
175 @Deactivate
176 public void deactivate() {
177 workplaceMap.destroy();
178 localWorkplaceMap.clear();
179 contextMap.destroy();
180 localContextMap.clear();
181
182 log.info("Stopped");
183 }
184
185 @Override
186 public void registerWorkplace(String name, Workplace workplace) throws StorageException {
187 workplaceMap.put(name, workplace);
188 }
189
190 @Override
191 public void removeWorkplace(String name) throws StorageException {
192 removeWorkplaceContexts(name);
193 workplaceMap.remove(name);
194 }
195
196 @Override
197 public Workplace getWorkplace(String name) throws StorageException {
198 return localWorkplaceMap.get(name);
199 }
200
201 @Override
202 public Collection<Workplace> getWorkplaces() throws StorageException {
203 return ImmutableList.copyOf(localWorkplaceMap.values());
204 }
205
206 @Override
207 public void commitWorkplace(String name, Workplace workplace, boolean handleEvent) throws StorageException {
208 workplace.setTriggerNext(handleEvent);
209 if (workplaceMap.containsKey(name)) {
210 workplaceMap.replace(name, workplace);
211 } else {
212 registerWorkplace(name, workplace);
213 }
214 }
215
216 @Override
217 public void registerContext(String name, WorkflowContext context) throws StorageException {
218 contextMap.put(name, context);
219 }
220
221 @Override
222 public void removeContext(String name) throws StorageException {
223 contextMap.remove(name);
224 }
225
226 @Override
227 public WorkflowContext getContext(String name) throws StorageException {
228 return localContextMap.get(name);
229 }
230
231 @Override
232 public void commitContext(String name, WorkflowContext context, boolean handleEvent) throws StorageException {
233 context.setTriggerNext(handleEvent);
234 if (contextMap.containsKey(name)) {
235 contextMap.replace(name, context);
236 } else {
237 registerContext(name, context);
238 }
239 }
240
241 @Override
242 public Collection<WorkflowContext> getContexts() throws StorageException {
243 return ImmutableList.copyOf(localContextMap.values());
244 }
245
246 @Override
247 public Collection<WorkflowContext> getWorkplaceContexts(String workplaceName) {
248 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
249 if (ctxMap == null) {
250 return Collections.emptyList();
251 }
252
253 return ImmutableList.copyOf(ctxMap.values());
254 }
255
256 @Override
257 public void removeWorkplaceContexts(String workplaceName) {
258 for (WorkflowContext ctx : getWorkplaceContexts(workplaceName)) {
259 removeContext(ctx.name());
260 }
261 }
262
263 private class WorkplaceMapListener implements MapEventListener<String, WorkflowData> {
264
265 @Override
266 public void event(MapEvent<String, WorkflowData> event) {
267
268 Workplace newWorkplace = (Workplace) Versioned.valueOrNull(event.newValue());
269 Workplace oldWorkplace = (Workplace) Versioned.valueOrNull(event.oldValue());
270
271 log.info("WorkplaceMap event: {}", event);
272 switch (event.type()) {
273 case INSERT:
274 insert(newWorkplace);
275 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newWorkplace));
276 break;
277 case UPDATE:
278 update(newWorkplace);
279 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newWorkplace));
280 break;
281 case REMOVE:
282 remove(oldWorkplace);
283 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldWorkplace));
284 break;
285 default:
286 }
287 }
288
289 private void insert(Workplace workplace) {
290 localWorkplaceMap.put(workplace.name(), workplace);
291 }
292
293 private void update(Workplace workplace) {
294 localWorkplaceMap.replace(workplace.name(), workplace);
295 }
296
297 private void remove(Workplace workplace) {
298 localWorkplaceMap.remove(workplace.name());
299 }
300
301 public void syncLocal() {
302 workplaceMap.values().stream().forEach(
303 x -> insert((Workplace) (x.value()))
304 );
305 }
306 }
307
308 private class WorkflowContextMapListener implements MapEventListener<String, WorkflowData> {
309
310 @Override
311 public void event(MapEvent<String, WorkflowData> event) {
312
313 WorkflowContext newContext = (WorkflowContext) Versioned.valueOrNull(event.newValue());
314 WorkflowContext oldContext = (WorkflowContext) Versioned.valueOrNull(event.oldValue());
315
jaegonkime0f45b52018-10-09 20:23:26 +0900316 log.debug("WorkflowContext event: {}", event);
jaegonkim6a7b5242018-09-12 23:09:42 +0900317 switch (event.type()) {
318 case INSERT:
319 insert(newContext);
320 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.INSERT, newContext));
321 break;
322 case UPDATE:
323 update(newContext);
324 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.UPDATE, newContext));
325 break;
326 case REMOVE:
327 remove(oldContext);
328 notifyDelegate(new WorkflowDataEvent(WorkflowDataEvent.Type.REMOVE, oldContext));
329 break;
330 default:
331 }
332 }
333
334 /**
335 * Inserts workflow context on local hash map.
336 * @param context workflow context
337 */
338 private void insert(WorkflowContext context) {
339 String workplaceName = context.workplaceName();
340 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
341 if (ctxMap == null) {
342 ctxMap = new HashMap<>();
343 localWorkplaceMemberMap.put(workplaceName, ctxMap);
344 }
345 ctxMap.put(context.name(), context);
346
347 localContextMap.put(context.name(), context);
348 }
349
350 /**
351 * Updates workflow context on local hash map.
352 * @param context workflow context
353 */
354 private void update(WorkflowContext context) {
355 String workplaceName = context.workplaceName();
356 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
357 if (ctxMap == null) {
358 ctxMap = new HashMap<>();
359 localWorkplaceMemberMap.put(workplaceName, ctxMap);
360 }
361 ctxMap.put(context.name(), context);
362
363 localContextMap.put(context.name(), context);
364 }
365
366 /**
367 * Removes workflow context from local hash map.
368 * @param context workflow context
369 */
370 private void remove(WorkflowContext context) {
371 localContextMap.remove(context.name());
372
373 String workplaceName = context.workplaceName();
374 Map<String, WorkflowContext> ctxMap = localWorkplaceMemberMap.get(workplaceName);
375 if (ctxMap == null) {
376 log.error("remove-context: Failed to find workplace({}) in localWorkplaceMemberMap", workplaceName);
377 return;
378 }
379 ctxMap.remove(context.name());
380 if (ctxMap.size() == 0) {
381 localWorkplaceMemberMap.remove(workplaceName, ctxMap);
382 }
383 }
384
385 /**
386 * Synchronizes local hash map.
387 */
388 public void syncLocal() {
389 contextMap.values().stream().forEach(
390 x -> insert((WorkflowContext) (x.value()))
391 );
392 }
393 }
394}