blob: 29aced114d8cd131ddac3ad24d85e4dec0b4fe14 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.hz;
17
alshabib978d2412014-11-29 15:29:15 -080018import com.google.common.collect.Lists;
Brian O'Connor72a034c2014-11-26 18:24:23 -080019import com.hazelcast.core.IQueue;
20import com.hazelcast.core.ItemListener;
21import com.hazelcast.monitor.LocalQueueStats;
22import org.onlab.onos.store.serializers.StoreSerializer;
23
24import java.util.Collection;
25import java.util.Iterator;
alshabib978d2412014-11-29 15:29:15 -080026import java.util.List;
Brian O'Connor72a034c2014-11-26 18:24:23 -080027import java.util.concurrent.TimeUnit;
28
29import static com.google.common.base.Preconditions.checkNotNull;
30
31// TODO: implementation is incomplete
32
33/**
34 * Wrapper around IQueue<byte[]> which serializes/deserializes
35 * key and value using StoreSerializer.
36 *
37 * @param <T> type
38 */
39public class SQueue<T> implements IQueue<T> {
40
41 private final IQueue<byte[]> q;
42 private final StoreSerializer serializer;
43
44 /**
45 * Creates a SQueue instance.
46 *
47 * @param baseQueue base IQueue to use
48 * @param serializer serializer to use for both key and value
49 */
50 public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
51 this.q = checkNotNull(baseQueue);
52 this.serializer = checkNotNull(serializer);
53 }
54
55 private byte[] serialize(Object key) {
56 return serializer.encode(key);
57 }
58
59 private T deserialize(byte[] key) {
60 return serializer.decode(key);
61 }
62
63 @Override
64 public boolean add(T t) {
65 return q.add(serialize(t));
66 }
67
68 @Override
69 public boolean offer(T t) {
70 return q.offer(serialize(t));
71 }
72
73 @Override
74 public void put(T t) throws InterruptedException {
75 q.put(serialize(t));
76 }
77
78 @Override
79 public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
80 return q.offer(serialize(t), l, timeUnit);
81 }
82
83 @Override
84 public T take() throws InterruptedException {
85 return deserialize(q.take());
86 }
87
88 @Override
89 public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
90 return deserialize(q.poll(l, timeUnit));
91 }
92
93 @Override
94 public int remainingCapacity() {
95 return q.remainingCapacity();
96 }
97
98 @Override
99 public boolean remove(Object o) {
100 return q.remove(serialize(o));
101 }
102
103 @Override
104 public boolean contains(Object o) {
105 return q.contains(serialize(o));
106 }
107
108 @Override
109 public int drainTo(Collection<? super T> collection) {
110 throw new UnsupportedOperationException();
111 }
112
113 @Override
114 public int drainTo(Collection<? super T> collection, int i) {
115 throw new UnsupportedOperationException();
116 }
117
118 @Override
119 public T remove() {
120 return deserialize(q.remove());
121 }
122
123 @Override
124 public T poll() {
125 return deserialize(q.poll());
126 }
127
128 @Override
129 public T element() {
130 return deserialize(q.element());
131 }
132
133 @Override
134 public T peek() {
135 return deserialize(q.peek());
136 }
137
138 @Override
139 public int size() {
140 return q.size();
141 }
142
143 @Override
144 public boolean isEmpty() {
145 return q.isEmpty();
146 }
147
148 @Override
149 public Iterator<T> iterator() {
alshabib978d2412014-11-29 15:29:15 -0800150 List<T> list = Lists.newArrayList();
151 q.forEach(elem -> list.add(deserialize(elem)));
152 return list.iterator();
Brian O'Connor72a034c2014-11-26 18:24:23 -0800153 }
154
155 @Override
156 public Object[] toArray() {
157 throw new UnsupportedOperationException();
158 }
159
160 @Override
161 public <T1> T1[] toArray(T1[] t1s) {
162 throw new UnsupportedOperationException();
163 }
164
165 @Override
166 public boolean containsAll(Collection<?> collection) {
167 throw new UnsupportedOperationException();
168 }
169
170 @Override
171 public boolean addAll(Collection<? extends T> collection) {
172 throw new UnsupportedOperationException();
173 }
174
175 @Override
176 public boolean removeAll(Collection<?> collection) {
177 throw new UnsupportedOperationException();
178 }
179
180 @Override
181 public boolean retainAll(Collection<?> collection) {
182 throw new UnsupportedOperationException();
183 }
184
185 @Override
186 public void clear() {
187 q.clear();
188 }
189
190 @Override
191 public LocalQueueStats getLocalQueueStats() {
192 return q.getLocalQueueStats();
193 }
194
195 @Override
196 public String addItemListener(ItemListener<T> itemListener, boolean b) {
197 throw new UnsupportedOperationException();
198 }
199
200 @Override
201 public boolean removeItemListener(String s) {
202 throw new UnsupportedOperationException();
203 }
204
205 @Deprecated
206 @Override
207 public Object getId() {
208 return q.getId();
209 }
210
211 @Override
212 public String getPartitionKey() {
213 return q.getPartitionKey();
214 }
215
216 @Override
217 public String getName() {
218 return q.getName();
219 }
220
221 @Override
222 public String getServiceName() {
223 return q.getServiceName();
224 }
225
226 @Override
227 public void destroy() {
228 q.destroy();
229 }
230}