blob: 8c82780be92482099d4f5b47bd92d93690dc61eb [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
22import java.util.Collection;
23import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080024import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080025import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070026import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import java.util.Set;
28import java.util.concurrent.CompletableFuture;
29import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
31import java.util.function.Predicate;
32
33import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080034import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080035import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
36import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
37import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080050import org.onosproject.store.service.AsyncConsistentMap;
51import org.onosproject.store.service.MapEvent;
52import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080053import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.service.Versioned;
55
56import com.google.common.collect.Sets;
57
58/**
59 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
60 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070061@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
62public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080063 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064
65 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
66
Madan Jampanifc981772016-02-16 09:46:42 -080067 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068
Madan Jampani65f24bb2016-03-15 15:16:18 -070069 public AtomixConsistentMap(CopycatClient client, Properties properties) {
70 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080071 }
72
73 @Override
74 public String name() {
75 return null;
76 }
77
78 @Override
79 public CompletableFuture<AtomixConsistentMap> open() {
80 return super.open().thenApply(result -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -080081 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 return result;
83 });
84 }
85
Madan Jampanifc981772016-02-16 09:46:42 -080086 private void handleEvent(List<MapEvent<String, byte[]>> events) {
87 events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088 }
89
90 @Override
91 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080092 return submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 }
94
95 @Override
96 public CompletableFuture<Integer> size() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080097 return submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080098 }
99
100 @Override
101 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800102 return submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 }
104
105 @Override
106 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800107 return submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 }
109
110 @Override
111 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800112 return submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 }
114
115 @Override
116 public CompletableFuture<Set<String>> keySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800117 return submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800118 }
119
120 @Override
121 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800122 return submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800123 }
124
125 @Override
126 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800127 return submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800128 }
129
130 @Override
131 @SuppressWarnings("unchecked")
132 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800133 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 .whenComplete((r, e) -> throwIfLocked(r.status()))
135 .thenApply(v -> v.oldValue());
136 }
137
138 @Override
139 @SuppressWarnings("unchecked")
140 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800141 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800142 .whenComplete((r, e) -> throwIfLocked(r.status()))
143 .thenApply(v -> v.newValue());
144 }
145
146 @Override
147 @SuppressWarnings("unchecked")
148 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800149 return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800150 .whenComplete((r, e) -> throwIfLocked(r.status()))
151 .thenApply(v -> v.oldValue());
152 }
153 @Override
154 @SuppressWarnings("unchecked")
155 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800156 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 .whenComplete((r, e) -> throwIfLocked(r.status()))
158 .thenApply(v -> v.oldValue());
159 }
160
161 @Override
162 @SuppressWarnings("unchecked")
163 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800164 return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800165 .whenComplete((r, e) -> throwIfLocked(r.status()))
166 .thenApply(v -> v.updated());
167 }
168
169 @Override
170 @SuppressWarnings("unchecked")
171 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800172 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800173 .whenComplete((r, e) -> throwIfLocked(r.status()))
174 .thenApply(v -> v.updated());
175 }
176
177 @Override
178 @SuppressWarnings("unchecked")
179 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800180 return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800181 .whenComplete((r, e) -> throwIfLocked(r.status()))
182 .thenApply(v -> v.oldValue());
183 }
184
185 @Override
186 @SuppressWarnings("unchecked")
187 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800188 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800189 newValue,
190 Match.ifValue(oldValue),
191 Match.ANY))
192 .whenComplete((r, e) -> throwIfLocked(r.status()))
193 .thenApply(v -> v.updated());
194 }
195
196 @Override
197 @SuppressWarnings("unchecked")
198 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800199 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800200 newValue,
201 Match.ANY,
202 Match.ifValue(oldVersion)))
203 .whenComplete((r, e) -> throwIfLocked(r.status()))
204 .thenApply(v -> v.updated());
205 }
206
207 @Override
208 public CompletableFuture<Void> clear() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800209 return submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 .whenComplete((r, e) -> throwIfLocked(r))
211 .thenApply(v -> null);
212 }
213
214 @Override
215 @SuppressWarnings("unchecked")
216 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
217 Predicate<? super byte[]> condition,
218 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
219 return get(key).thenCompose(r1 -> {
220 byte[] existingValue = r1 == null ? null : r1.value();
221 // if the condition evaluates to false, return existing value.
222 if (!condition.test(existingValue)) {
223 return CompletableFuture.completedFuture(r1);
224 }
225
226 AtomicReference<byte[]> computedValue = new AtomicReference<>();
227 // if remappingFunction throws an exception, return the exception.
228 try {
229 computedValue.set(remappingFunction.apply(key, existingValue));
230 } catch (Exception e) {
231 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
232 future.completeExceptionally(e);
233 return future;
234 }
235 if (computedValue.get() == null && r1 == null) {
236 return CompletableFuture.completedFuture(null);
237 }
238 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
239 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800240 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800241 computedValue.get(),
242 valueMatch,
243 versionMatch))
244 .whenComplete((r, e) -> throwIfLocked(r.status()))
245 .thenApply(v -> v.newValue());
246 });
247 }
248
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800249 @Override
250 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800251 if (mapEventListeners.isEmpty()) {
252 return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
253 } else {
Madan Jampani40f022e2016-03-02 21:35:14 -0800254 mapEventListeners.add(listener);
255 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800256 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800257 }
258
259 @Override
260 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
261 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800262 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800263 }
264 return CompletableFuture.completedFuture(null);
265 }
266
267 private void throwIfLocked(MapEntryUpdateResult.Status status) {
268 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
269 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
270 }
271 }
272
Madan Jampanicadd70b2016-02-08 13:45:43 -0800273 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800274 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800275 return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800276 }
277
278 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800279 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800280 return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800281 }
282
283 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800284 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800285 return submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800286 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800287 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800288}