blob: c58e41153c4d33b0b61a14fd42f0c1db932545a5 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.hz;
17
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -080018import com.google.common.base.Function;
19import com.google.common.collect.FluentIterable;
Brian O'Connor72a034c2014-11-26 18:24:23 -080020import com.hazelcast.core.IQueue;
alshabiba9819bf2014-11-30 18:15:52 -080021import com.hazelcast.core.ItemEvent;
Brian O'Connor72a034c2014-11-26 18:24:23 -080022import com.hazelcast.core.ItemListener;
23import com.hazelcast.monitor.LocalQueueStats;
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -080024
Brian O'Connor72a034c2014-11-26 18:24:23 -080025import org.onlab.onos.store.serializers.StoreSerializer;
26
27import java.util.Collection;
28import java.util.Iterator;
29import java.util.concurrent.TimeUnit;
30
31import static com.google.common.base.Preconditions.checkNotNull;
32
33// TODO: implementation is incomplete
34
35/**
36 * Wrapper around IQueue<byte[]> which serializes/deserializes
37 * key and value using StoreSerializer.
38 *
39 * @param <T> type
40 */
41public class SQueue<T> implements IQueue<T> {
42
43 private final IQueue<byte[]> q;
44 private final StoreSerializer serializer;
45
46 /**
47 * Creates a SQueue instance.
48 *
49 * @param baseQueue base IQueue to use
50 * @param serializer serializer to use for both key and value
51 */
52 public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
53 this.q = checkNotNull(baseQueue);
54 this.serializer = checkNotNull(serializer);
55 }
56
57 private byte[] serialize(Object key) {
58 return serializer.encode(key);
59 }
60
61 private T deserialize(byte[] key) {
62 return serializer.decode(key);
63 }
64
65 @Override
66 public boolean add(T t) {
67 return q.add(serialize(t));
68 }
69
70 @Override
71 public boolean offer(T t) {
72 return q.offer(serialize(t));
73 }
74
75 @Override
76 public void put(T t) throws InterruptedException {
77 q.put(serialize(t));
78 }
79
80 @Override
81 public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
82 return q.offer(serialize(t), l, timeUnit);
83 }
84
85 @Override
86 public T take() throws InterruptedException {
87 return deserialize(q.take());
88 }
89
90 @Override
91 public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
92 return deserialize(q.poll(l, timeUnit));
93 }
94
95 @Override
96 public int remainingCapacity() {
97 return q.remainingCapacity();
98 }
99
100 @Override
101 public boolean remove(Object o) {
102 return q.remove(serialize(o));
103 }
104
105 @Override
106 public boolean contains(Object o) {
107 return q.contains(serialize(o));
108 }
109
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800110 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800111 @Override
112 public int drainTo(Collection<? super T> collection) {
113 throw new UnsupportedOperationException();
114 }
115
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800116 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800117 @Override
118 public int drainTo(Collection<? super T> collection, int i) {
119 throw new UnsupportedOperationException();
120 }
121
122 @Override
123 public T remove() {
124 return deserialize(q.remove());
125 }
126
127 @Override
128 public T poll() {
129 return deserialize(q.poll());
130 }
131
132 @Override
133 public T element() {
134 return deserialize(q.element());
135 }
136
137 @Override
138 public T peek() {
139 return deserialize(q.peek());
140 }
141
142 @Override
143 public int size() {
144 return q.size();
145 }
146
147 @Override
148 public boolean isEmpty() {
149 return q.isEmpty();
150 }
151
152 @Override
153 public Iterator<T> iterator() {
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800154 return FluentIterable.from(q)
155 .transform(new DeserializeVal())
156 .iterator();
Brian O'Connor72a034c2014-11-26 18:24:23 -0800157 }
158
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800159 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800160 @Override
161 public Object[] toArray() {
162 throw new UnsupportedOperationException();
163 }
164
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800165 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800166 @Override
167 public <T1> T1[] toArray(T1[] t1s) {
168 throw new UnsupportedOperationException();
169 }
170
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800171 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800172 @Override
173 public boolean containsAll(Collection<?> collection) {
174 throw new UnsupportedOperationException();
175 }
176
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800177 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800178 @Override
179 public boolean addAll(Collection<? extends T> collection) {
180 throw new UnsupportedOperationException();
181 }
182
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800183 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800184 @Override
185 public boolean removeAll(Collection<?> collection) {
186 throw new UnsupportedOperationException();
187 }
188
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800189 @Deprecated // not implemented yet
Brian O'Connor72a034c2014-11-26 18:24:23 -0800190 @Override
191 public boolean retainAll(Collection<?> collection) {
192 throw new UnsupportedOperationException();
193 }
194
195 @Override
196 public void clear() {
197 q.clear();
198 }
199
200 @Override
201 public LocalQueueStats getLocalQueueStats() {
202 return q.getLocalQueueStats();
203 }
204
alshabiba9819bf2014-11-30 18:15:52 -0800205
Brian O'Connor72a034c2014-11-26 18:24:23 -0800206 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800207 public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
208 ItemListener<byte[]> il = new ItemListener<byte[]>() {
209 @Override
210 public void itemAdded(ItemEvent<byte[]> item) {
211 itemListener.itemAdded(new ItemEvent<T>(getName(item),
212 item.getEventType(),
213 deserialize(item.getItem()),
214 item.getMember()));
215 }
216
217 @Override
218 public void itemRemoved(ItemEvent<byte[]> item) {
219 itemListener.itemRemoved(new ItemEvent<T>(getName(item),
220 item.getEventType(),
221 deserialize(item.getItem()),
222 item.getMember()));
223 }
224
225 private String getName(ItemEvent<byte[]> item) {
226 return (item.getSource() instanceof String) ?
227 (String) item.getSource() : item.getSource().toString();
228
229 }
230 };
231 return q.addItemListener(il, withValue);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800232 }
233
alshabiba9819bf2014-11-30 18:15:52 -0800234
Brian O'Connor72a034c2014-11-26 18:24:23 -0800235 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800236 public boolean removeItemListener(String registrationId) {
237 return q.removeItemListener(registrationId);
Brian O'Connor72a034c2014-11-26 18:24:23 -0800238 }
239
240 @Deprecated
241 @Override
242 public Object getId() {
243 return q.getId();
244 }
245
246 @Override
247 public String getPartitionKey() {
248 return q.getPartitionKey();
249 }
250
251 @Override
252 public String getName() {
253 return q.getName();
254 }
255
256 @Override
257 public String getServiceName() {
258 return q.getServiceName();
259 }
260
261 @Override
262 public void destroy() {
263 q.destroy();
264 }
Yuta HIGUCHIc9438f12014-11-29 17:14:39 -0800265
266 private final class DeserializeVal implements Function<byte[], T> {
267 @Override
268 public T apply(byte[] input) {
269 return deserialize(input);
270 }
271 }
Brian O'Connor72a034c2014-11-26 18:24:23 -0800272}