blob: 977470dbbde43f69b409ab857b2c2ed35bd25104 [file] [log] [blame]
Madan Jampani35708a92016-07-06 10:48:19 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani819d61d2016-07-25 20:29:43 -070018import io.atomix.catalyst.buffer.BufferInput;
19import io.atomix.catalyst.buffer.BufferOutput;
20import io.atomix.catalyst.serializer.CatalystSerializable;
21import io.atomix.catalyst.serializer.SerializableTypeResolver;
22import io.atomix.catalyst.serializer.Serializer;
23import io.atomix.catalyst.serializer.SerializerRegistry;
24import io.atomix.copycat.Command;
25
Madan Jampani35708a92016-07-06 10:48:19 -070026import java.util.ArrayList;
27import java.util.Collection;
28import java.util.stream.Collectors;
29import java.util.stream.IntStream;
30
31import org.onosproject.store.service.Task;
32import org.onosproject.store.service.WorkQueueStats;
33
34import com.google.common.base.MoreObjects;
35
Madan Jampani35708a92016-07-06 10:48:19 -070036/**
37 * {@link AtomixWorkQueue} resource state machine operations.
38 */
39public final class AtomixWorkQueueCommands {
40
41 private AtomixWorkQueueCommands() {
42 }
43
44 /**
45 * Command to add a collection of tasks to the queue.
46 */
47 @SuppressWarnings("serial")
48 public static class Add implements Command<Void>, CatalystSerializable {
49
50 private Collection<byte[]> items;
51
52 private Add() {
53 }
54
55 public Add(Collection<byte[]> items) {
56 this.items = items;
57 }
58
59 @Override
60 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
61 buffer.writeInt(items.size());
62 items.forEach(task -> serializer.writeObject(task, buffer));
63 }
64
65 @Override
66 public void readObject(BufferInput<?> buffer, Serializer serializer) {
67 items = IntStream.range(0, buffer.readInt())
68 .mapToObj(i -> serializer.<byte[]>readObject(buffer))
69 .collect(Collectors.toCollection(ArrayList::new));
70 }
71
72 public Collection<byte[]> items() {
73 return items;
74 }
75
76 @Override
Jordan Halterman57a3c3d2017-02-05 22:33:04 -080077 public CompactionMode compaction() {
78 return CompactionMode.QUORUM;
79 }
80
81 @Override
Madan Jampani35708a92016-07-06 10:48:19 -070082 public String toString() {
83 return MoreObjects.toStringHelper(getClass())
84 .add("items", items)
85 .toString();
86 }
87 }
88
89 /**
90 * Command to take a task from the queue.
91 */
92 @SuppressWarnings("serial")
93 public static class Take implements Command<Collection<Task<byte[]>>>, CatalystSerializable {
94
95 private int maxTasks;
96
97 private Take() {
98 }
99
100 public Take(int maxTasks) {
101 this.maxTasks = maxTasks;
102 }
103
104 @Override
105 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
106 buffer.writeInt(maxTasks);
107 }
108
109 @Override
110 public void readObject(BufferInput<?> buffer, Serializer serializer) {
111 maxTasks = buffer.readInt();
112 }
113
114 public int maxTasks() {
115 return maxTasks;
116 }
117
118 @Override
Jordan Halterman57a3c3d2017-02-05 22:33:04 -0800119 public CompactionMode compaction() {
120 return CompactionMode.QUORUM;
121 }
122
123 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700124 public String toString() {
125 return MoreObjects.toStringHelper(getClass())
126 .add("maxTasks", maxTasks)
127 .toString();
128 }
129 }
130
131 @SuppressWarnings("serial")
132 public static class Stats implements Command<WorkQueueStats>, CatalystSerializable {
133
134 @Override
135 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
136 }
137
138 @Override
139 public void readObject(BufferInput<?> buffer, Serializer serializer) {
140 }
141
142 @Override
143 public String toString() {
144 return MoreObjects.toStringHelper(getClass())
145 .toString();
146 }
147 }
148
149
150
151 @SuppressWarnings("serial")
152 public static class Register implements Command<Void>, CatalystSerializable {
153
154 @Override
155 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
156 }
157
158 @Override
159 public void readObject(BufferInput<?> buffer, Serializer serializer) {
160 }
161
162 @Override
Jordan Halterman57a3c3d2017-02-05 22:33:04 -0800163 public CompactionMode compaction() {
164 return CompactionMode.QUORUM;
165 }
166
167 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700168 public String toString() {
169 return MoreObjects.toStringHelper(getClass())
170 .toString();
171 }
172 }
173
174 @SuppressWarnings("serial")
175 public static class Unregister implements Command<Void>, CatalystSerializable {
176
177 @Override
178 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
179 }
180
181 @Override
182 public void readObject(BufferInput<?> buffer, Serializer serializer) {
183 }
184
185 @Override
Jordan Halterman57a3c3d2017-02-05 22:33:04 -0800186 public CompactionMode compaction() {
187 return CompactionMode.TOMBSTONE;
188 }
189
190 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700191 public String toString() {
192 return MoreObjects.toStringHelper(getClass())
193 .toString();
194 }
195 }
196
197 @SuppressWarnings("serial")
198 public static class Complete implements Command<Void>, CatalystSerializable {
199 private Collection<String> taskIds;
200
201 private Complete() {
202 }
203
204 public Complete(Collection<String> taskIds) {
205 this.taskIds = taskIds;
206 }
207
208 @Override
209 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
210 serializer.writeObject(taskIds, buffer);
211 }
212
213 @Override
214 public void readObject(BufferInput<?> buffer, Serializer serializer) {
215 taskIds = serializer.readObject(buffer);
216 }
217
218 public Collection<String> taskIds() {
219 return taskIds;
220 }
221
222 @Override
Jordan Halterman57a3c3d2017-02-05 22:33:04 -0800223 public CompactionMode compaction() {
224 return CompactionMode.TOMBSTONE;
225 }
226
227 @Override
Madan Jampani35708a92016-07-06 10:48:19 -0700228 public String toString() {
229 return MoreObjects.toStringHelper(getClass())
230 .add("taskIds", taskIds)
231 .toString();
232 }
233 }
234
Madan Jampani819d61d2016-07-25 20:29:43 -0700235 @SuppressWarnings("serial")
236 public static class Clear implements Command<Void>, CatalystSerializable {
237
238 @Override
239 public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
240 }
241
242 @Override
243 public void readObject(BufferInput<?> buffer, Serializer serializer) {
244 }
245
246 @Override
Jordan Halterman57a3c3d2017-02-05 22:33:04 -0800247 public CompactionMode compaction() {
248 return CompactionMode.TOMBSTONE;
249 }
250
251 @Override
Madan Jampani819d61d2016-07-25 20:29:43 -0700252 public String toString() {
253 return MoreObjects.toStringHelper(getClass())
254 .toString();
255 }
256 }
257
Madan Jampani35708a92016-07-06 10:48:19 -0700258 /**
259 * Work queue command type resolver.
260 */
261 public static class TypeResolver implements SerializableTypeResolver {
262 @Override
263 public void resolve(SerializerRegistry registry) {
264 registry.register(Register.class, -960);
265 registry.register(Unregister.class, -961);
266 registry.register(Take.class, -962);
267 registry.register(Add.class, -963);
268 registry.register(Complete.class, -964);
269 registry.register(Stats.class, -965);
Madan Jampani819d61d2016-07-25 20:29:43 -0700270 registry.register(Clear.class, -966);
Madan Jampani35708a92016-07-06 10:48:19 -0700271 }
272 }
273}