blob: d13b66f1754afb66af37cb00d2ff3f3652474ecf [file] [log] [blame]
jaegonkim6a7b5242018-09-12 23:09:42 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.workflow.impl;
17
18import com.google.common.collect.Maps;
19import org.onlab.util.AbstractAccumulator;
20import org.onosproject.workflow.api.WorkflowData;
21import org.onosproject.workflow.api.WorkflowBatchDelegate;
22
23import java.util.Collection;
24import java.util.List;
25import java.util.Map;
26import java.util.Timer;
27
28/**
29 * An accumulator for building batches of workflow operations. Only one batch should
30 * be in process per instance at a time.
31 */
32public class WorkflowAccumulator extends AbstractAccumulator<WorkflowData> {
33
34 private static final int DEFAULT_MAX_EVENTS = 1000;
35 private static final int DEFAULT_MAX_IDLE_MS = 10;
36 private static final int DEFAULT_MAX_BATCH_MS = 50;
37
38 private static final Timer TIMER = new Timer("onos-workflow-op-batching");
39
40 private final WorkflowBatchDelegate delegate;
41
42 private volatile boolean ready;
43
44 /**
45 * Creates an intent operation accumulator.
46 *
47 * @param delegate the intent batch delegate
48 */
49 protected WorkflowAccumulator(WorkflowBatchDelegate delegate) {
50 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
51 this.delegate = delegate;
52 // Assume that the delegate is ready for worklettype at the start
53 ready = true; //TODO validate the assumption that delegate is ready
54 }
55
56 @Override
57 public void processItems(List<WorkflowData> items) {
58 ready = false;
59 delegate.execute(reduce(items));
60 }
61
62 private Collection<WorkflowData> reduce(List<WorkflowData> ops) {
63 Map<String, WorkflowData> map = Maps.newHashMap();
64 for (WorkflowData op : ops) {
65 map.put(op.name(), op);
66 }
67 //TODO check the version... or maybe workplaceStore will handle this.
68 return map.values();
69 }
70
71 @Override
72 public boolean isReady() {
73 return ready;
74 }
75
76 /**
77 * Making accumulator to be ready.
78 */
79 public void ready() {
80 ready = true;
81 }
82}