blob: 3a2abb86d0195590cf52824ce4e9efe165373f3d [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.hz;
Brian O'Connor72a034c2014-11-26 18:24:23 -080017
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -080018import com.google.common.base.Function;
19import com.google.common.collect.FluentIterable;
Brian O'Connor72a034c2014-11-26 18:24:23 -080020import com.hazelcast.core.IQueue;
alshabiba9819bf2014-11-30 18:15:52 -080021import com.hazelcast.core.ItemEvent;
Brian O'Connor72a034c2014-11-26 18:24:23 -080022import com.hazelcast.core.ItemListener;
23import com.hazelcast.monitor.LocalQueueStats;
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -080024
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor72a034c2014-11-26 18:24:23 -080026
27import java.util.Collection;
28import java.util.Iterator;
29import java.util.concurrent.TimeUnit;
30
31import static com.google.common.base.Preconditions.checkNotNull;
32
Brian O'Connor72a034c2014-11-26 18:24:23 -080033/**
34 * Wrapper around IQueue<byte[]> which serializes/deserializes
35 * key and value using StoreSerializer.
36 *
37 * @param <T> type
38 */
39public class SQueue<T> implements IQueue<T> {
40
41 private final IQueue<byte[]> q;
42 private final StoreSerializer serializer;
43
44 /**
45 * Creates a SQueue instance.
46 *
47 * @param baseQueue base IQueue to use
48 * @param serializer serializer to use for both key and value
49 */
50 public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
51 this.q = checkNotNull(baseQueue);
52 this.serializer = checkNotNull(serializer);
53 }
54
55 private byte[] serialize(Object key) {
56 return serializer.encode(key);
57 }
58
59 private T deserialize(byte[] key) {
60 return serializer.decode(key);
61 }
62
63 @Override
64 public boolean add(T t) {
65 return q.add(serialize(t));
66 }
67
68 @Override
69 public boolean offer(T t) {
70 return q.offer(serialize(t));
71 }
72
73 @Override
74 public void put(T t) throws InterruptedException {
75 q.put(serialize(t));
76 }
77
78 @Override
79 public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
80 return q.offer(serialize(t), l, timeUnit);
81 }
82
83 @Override
84 public T take() throws InterruptedException {
85 return deserialize(q.take());
86 }
87
88 @Override
89 public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
90 return deserialize(q.poll(l, timeUnit));
91 }
92
93 @Override
94 public int remainingCapacity() {
95 return q.remainingCapacity();
96 }
97
98 @Override
99 public boolean remove(Object o) {
100 return q.remove(serialize(o));
101 }
102
103 @Override
104 public boolean contains(Object o) {
105 return q.contains(serialize(o));
106 }
107
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800108 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800109 @Override
110 public int drainTo(Collection<? super T> collection) {
111 throw new UnsupportedOperationException();
112 }
113
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800114 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800115 @Override
116 public int drainTo(Collection<? super T> collection, int i) {
117 throw new UnsupportedOperationException();
118 }
119
120 @Override
121 public T remove() {
122 return deserialize(q.remove());
123 }
124
125 @Override
126 public T poll() {
127 return deserialize(q.poll());
128 }
129
130 @Override
131 public T element() {
132 return deserialize(q.element());
133 }
134
135 @Override
136 public T peek() {
137 return deserialize(q.peek());
138 }
139
140 @Override
141 public int size() {
142 return q.size();
143 }
144
145 @Override
146 public boolean isEmpty() {
147 return q.isEmpty();
148 }
149
150 @Override
151 public Iterator<T> iterator() {
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800152 return FluentIterable.from(q)
153 .transform(new DeserializeVal())
154 .iterator();
Brian O'Connor72a034c2014-11-26 18:24:23 -0800155 }
156
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800157 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800158 @Override
159 public Object[] toArray() {
160 throw new UnsupportedOperationException();
161 }
162
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800163 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800164 @Override
165 public <T1> T1[] toArray(T1[] t1s) {
166 throw new UnsupportedOperationException();
167 }
168
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800169 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800170 @Override
171 public boolean containsAll(Collection<?> collection) {
172 throw new UnsupportedOperationException();
173 }
174
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800175 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800176 @Override
177 public boolean addAll(Collection<? extends T> collection) {
178 throw new UnsupportedOperationException();
179 }
180
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800181 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800182 @Override
183 public boolean removeAll(Collection<?> collection) {
184 throw new UnsupportedOperationException();
185 }
186
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800187 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800188 @Override
189 public boolean retainAll(Collection<?> collection) {
190 throw new UnsupportedOperationException();
191 }
192
193 @Override
194 public void clear() {
195 q.clear();
196 }
197
198 @Override
199 public LocalQueueStats getLocalQueueStats() {
200 return q.getLocalQueueStats();
201 }
202
alshabiba9819bf2014-11-30 18:15:52 -0800203
Brian O'Connor72a034c2014-11-26 18:24:23 -0800204 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800205 public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
206 ItemListener<byte[]> il = new ItemListener<byte[]>() {
207 @Override
208 public void itemAdded(ItemEvent<byte[]> item) {
209 itemListener.itemAdded(new ItemEvent<T>(getName(item),
210 item.getEventType(),
211 deserialize(item.getItem()),
212 item.getMember()));
213 }
214
215 @Override
216 public void itemRemoved(ItemEvent<byte[]> item) {
217 itemListener.itemRemoved(new ItemEvent<T>(getName(item),
218 item.getEventType(),
219 deserialize(item.getItem()),
220 item.getMember()));
221 }
222
223 private String getName(ItemEvent<byte[]> item) {
224 return (item.getSource() instanceof String) ?
225 (String) item.getSource() : item.getSource().toString();
226
227 }
228 };
229 return q.addItemListener(il, withValue);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800230 }
231
alshabiba9819bf2014-11-30 18:15:52 -0800232
Brian O'Connor72a034c2014-11-26 18:24:23 -0800233 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800234 public boolean removeItemListener(String registrationId) {
235 return q.removeItemListener(registrationId);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800236 }
237
238 @Deprecated
239 @Override
240 public Object getId() {
241 return q.getId();
242 }
243
244 @Override
245 public String getPartitionKey() {
246 return q.getPartitionKey();
247 }
248
249 @Override
250 public String getName() {
251 return q.getName();
252 }
253
254 @Override
255 public String getServiceName() {
256 return q.getServiceName();
257 }
258
259 @Override
260 public void destroy() {
261 q.destroy();
262 }
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800263
264 private final class DeserializeVal implements Function<byte[], T> {
265 @Override
266 public T apply(byte[] input) {
267 return deserialize(input);
268 }
269 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800270}