blob: f560cd27a850056de174e5c341acf0c1c46d56e8 [file] [log] [blame]
Brian O'Connor72a034c2014-11-26 18:24:23 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.hz;
17
18import com.hazelcast.core.IQueue;
19import com.hazelcast.core.ItemListener;
20import com.hazelcast.monitor.LocalQueueStats;
21import org.onlab.onos.store.serializers.StoreSerializer;
22
23import java.util.Collection;
24import java.util.Iterator;
25import java.util.concurrent.TimeUnit;
26
27import static com.google.common.base.Preconditions.checkNotNull;
28
29// TODO: implementation is incomplete
30
31/**
32 * Wrapper around IQueue<byte[]> which serializes/deserializes
33 * key and value using StoreSerializer.
34 *
35 * @param <T> type
36 */
37public class SQueue<T> implements IQueue<T> {
38
39 private final IQueue<byte[]> q;
40 private final StoreSerializer serializer;
41
42 /**
43 * Creates a SQueue instance.
44 *
45 * @param baseQueue base IQueue to use
46 * @param serializer serializer to use for both key and value
47 */
48 public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
49 this.q = checkNotNull(baseQueue);
50 this.serializer = checkNotNull(serializer);
51 }
52
53 private byte[] serialize(Object key) {
54 return serializer.encode(key);
55 }
56
57 private T deserialize(byte[] key) {
58 return serializer.decode(key);
59 }
60
61 @Override
62 public boolean add(T t) {
63 return q.add(serialize(t));
64 }
65
66 @Override
67 public boolean offer(T t) {
68 return q.offer(serialize(t));
69 }
70
71 @Override
72 public void put(T t) throws InterruptedException {
73 q.put(serialize(t));
74 }
75
76 @Override
77 public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
78 return q.offer(serialize(t), l, timeUnit);
79 }
80
81 @Override
82 public T take() throws InterruptedException {
83 return deserialize(q.take());
84 }
85
86 @Override
87 public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
88 return deserialize(q.poll(l, timeUnit));
89 }
90
91 @Override
92 public int remainingCapacity() {
93 return q.remainingCapacity();
94 }
95
96 @Override
97 public boolean remove(Object o) {
98 return q.remove(serialize(o));
99 }
100
101 @Override
102 public boolean contains(Object o) {
103 return q.contains(serialize(o));
104 }
105
106 @Override
107 public int drainTo(Collection<? super T> collection) {
108 throw new UnsupportedOperationException();
109 }
110
111 @Override
112 public int drainTo(Collection<? super T> collection, int i) {
113 throw new UnsupportedOperationException();
114 }
115
116 @Override
117 public T remove() {
118 return deserialize(q.remove());
119 }
120
121 @Override
122 public T poll() {
123 return deserialize(q.poll());
124 }
125
126 @Override
127 public T element() {
128 return deserialize(q.element());
129 }
130
131 @Override
132 public T peek() {
133 return deserialize(q.peek());
134 }
135
136 @Override
137 public int size() {
138 return q.size();
139 }
140
141 @Override
142 public boolean isEmpty() {
143 return q.isEmpty();
144 }
145
146 @Override
147 public Iterator<T> iterator() {
148 throw new UnsupportedOperationException();
149 }
150
151 @Override
152 public Object[] toArray() {
153 throw new UnsupportedOperationException();
154 }
155
156 @Override
157 public <T1> T1[] toArray(T1[] t1s) {
158 throw new UnsupportedOperationException();
159 }
160
161 @Override
162 public boolean containsAll(Collection<?> collection) {
163 throw new UnsupportedOperationException();
164 }
165
166 @Override
167 public boolean addAll(Collection<? extends T> collection) {
168 throw new UnsupportedOperationException();
169 }
170
171 @Override
172 public boolean removeAll(Collection<?> collection) {
173 throw new UnsupportedOperationException();
174 }
175
176 @Override
177 public boolean retainAll(Collection<?> collection) {
178 throw new UnsupportedOperationException();
179 }
180
181 @Override
182 public void clear() {
183 q.clear();
184 }
185
186 @Override
187 public LocalQueueStats getLocalQueueStats() {
188 return q.getLocalQueueStats();
189 }
190
191 @Override
192 public String addItemListener(ItemListener<T> itemListener, boolean b) {
193 throw new UnsupportedOperationException();
194 }
195
196 @Override
197 public boolean removeItemListener(String s) {
198 throw new UnsupportedOperationException();
199 }
200
201 @Deprecated
202 @Override
203 public Object getId() {
204 return q.getId();
205 }
206
207 @Override
208 public String getPartitionKey() {
209 return q.getPartitionKey();
210 }
211
212 @Override
213 public String getName() {
214 return q.getName();
215 }
216
217 @Override
218 public String getServiceName() {
219 return q.getServiceName();
220 }
221
222 @Override
223 public void destroy() {
224 q.destroy();
225 }
226}