blob: 41f6e25511c62523df43e7d0eb3994256ecd408c [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5ddaf192016-04-04 07:39:31 -070018import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070020import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080021import io.atomix.resource.ResourceTypeInfo;
22
23import java.util.Collection;
24import java.util.ConcurrentModificationException;
Madan Jampanifc981772016-02-16 09:46:42 -080025import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Map.Entry;
Madan Jampani65f24bb2016-03-15 15:16:18 -070027import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080028import java.util.Set;
29import java.util.concurrent.CompletableFuture;
30import java.util.concurrent.atomic.AtomicReference;
31import java.util.function.BiFunction;
32import java.util.function.Predicate;
33
34import org.onlab.util.Match;
Madan Jampanicadd70b2016-02-08 13:45:43 -080035import org.onosproject.store.primitives.TransactionId;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080036import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
37import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
38import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
39import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
40import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
41import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
42import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
43import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
44import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
45import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
46import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
47import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
48import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
49import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
50import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080051import org.onosproject.store.service.AsyncConsistentMap;
52import org.onosproject.store.service.MapEvent;
53import org.onosproject.store.service.MapEventListener;
Madan Jampani74da78b2016-02-09 21:18:36 -080054import org.onosproject.store.service.MapTransaction;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.service.Versioned;
Madan Jampani5ddaf192016-04-04 07:39:31 -070056import org.slf4j.Logger;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057
58import com.google.common.collect.Sets;
59
60/**
61 * Distributed resource providing the {@link AsyncConsistentMap} primitive.
62 */
Madan Jampani65f24bb2016-03-15 15:16:18 -070063@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
64public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
Madan Jampani74da78b2016-02-09 21:18:36 -080065 implements AsyncConsistentMap<String, byte[]> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080066
Madan Jampani5ddaf192016-04-04 07:39:31 -070067 private final Logger log = getLogger(getClass());
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
69
Madan Jampanifc981772016-02-16 09:46:42 -080070 public static final String CHANGE_SUBJECT = "changeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080071
Madan Jampani65f24bb2016-03-15 15:16:18 -070072 public AtomixConsistentMap(CopycatClient client, Properties properties) {
73 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 }
75
76 @Override
77 public String name() {
78 return null;
79 }
80
81 @Override
82 public CompletableFuture<AtomixConsistentMap> open() {
83 return super.open().thenApply(result -> {
Madan Jampani3a9911c2016-02-21 11:25:45 -080084 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 return result;
86 });
87 }
88
Madan Jampanifc981772016-02-16 09:46:42 -080089 private void handleEvent(List<MapEvent<String, byte[]>> events) {
Madan Jampani5ddaf192016-04-04 07:39:31 -070090 events.forEach(event -> mapEventListeners.forEach(listener -> {
91 try {
92 listener.event(event);
93 } catch (Exception e) {
94 log.warn("Error processing map event", e);
95 }
96 }));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080097 }
98
99 @Override
100 public CompletableFuture<Boolean> isEmpty() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800101 return submit(new IsEmpty());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800102 }
103
104 @Override
105 public CompletableFuture<Integer> size() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800106 return submit(new Size());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 }
108
109 @Override
110 public CompletableFuture<Boolean> containsKey(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800111 return submit(new ContainsKey(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 }
113
114 @Override
115 public CompletableFuture<Boolean> containsValue(byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800116 return submit(new ContainsValue(value));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800117 }
118
119 @Override
120 public CompletableFuture<Versioned<byte[]>> get(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800121 return submit(new Get(key));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800122 }
123
124 @Override
125 public CompletableFuture<Set<String>> keySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800126 return submit(new KeySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800127 }
128
129 @Override
130 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800131 return submit(new Values());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800132 }
133
134 @Override
135 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800136 return submit(new EntrySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800137 }
138
139 @Override
140 @SuppressWarnings("unchecked")
141 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800142 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800143 .whenComplete((r, e) -> throwIfLocked(r.status()))
144 .thenApply(v -> v.oldValue());
145 }
146
147 @Override
148 @SuppressWarnings("unchecked")
149 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800150 return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800151 .whenComplete((r, e) -> throwIfLocked(r.status()))
152 .thenApply(v -> v.newValue());
153 }
154
155 @Override
156 @SuppressWarnings("unchecked")
157 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800158 return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800159 .whenComplete((r, e) -> throwIfLocked(r.status()))
160 .thenApply(v -> v.oldValue());
161 }
162 @Override
163 @SuppressWarnings("unchecked")
164 public CompletableFuture<Versioned<byte[]>> remove(String key) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800165 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 .whenComplete((r, e) -> throwIfLocked(r.status()))
167 .thenApply(v -> v.oldValue());
168 }
169
170 @Override
171 @SuppressWarnings("unchecked")
172 public CompletableFuture<Boolean> remove(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800173 return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800174 .whenComplete((r, e) -> throwIfLocked(r.status()))
175 .thenApply(v -> v.updated());
176 }
177
178 @Override
179 @SuppressWarnings("unchecked")
180 public CompletableFuture<Boolean> remove(String key, long version) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800181 return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800182 .whenComplete((r, e) -> throwIfLocked(r.status()))
183 .thenApply(v -> v.updated());
184 }
185
186 @Override
187 @SuppressWarnings("unchecked")
188 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800189 return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800190 .whenComplete((r, e) -> throwIfLocked(r.status()))
191 .thenApply(v -> v.oldValue());
192 }
193
194 @Override
195 @SuppressWarnings("unchecked")
196 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800197 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800198 newValue,
199 Match.ifValue(oldValue),
200 Match.ANY))
201 .whenComplete((r, e) -> throwIfLocked(r.status()))
202 .thenApply(v -> v.updated());
203 }
204
205 @Override
206 @SuppressWarnings("unchecked")
207 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800208 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 newValue,
210 Match.ANY,
211 Match.ifValue(oldVersion)))
212 .whenComplete((r, e) -> throwIfLocked(r.status()))
213 .thenApply(v -> v.updated());
214 }
215
216 @Override
217 public CompletableFuture<Void> clear() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800218 return submit(new Clear())
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800219 .whenComplete((r, e) -> throwIfLocked(r))
220 .thenApply(v -> null);
221 }
222
223 @Override
224 @SuppressWarnings("unchecked")
225 public CompletableFuture<Versioned<byte[]>> computeIf(String key,
226 Predicate<? super byte[]> condition,
227 BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
228 return get(key).thenCompose(r1 -> {
229 byte[] existingValue = r1 == null ? null : r1.value();
230 // if the condition evaluates to false, return existing value.
231 if (!condition.test(existingValue)) {
232 return CompletableFuture.completedFuture(r1);
233 }
234
235 AtomicReference<byte[]> computedValue = new AtomicReference<>();
236 // if remappingFunction throws an exception, return the exception.
237 try {
238 computedValue.set(remappingFunction.apply(key, existingValue));
239 } catch (Exception e) {
240 CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
241 future.completeExceptionally(e);
242 return future;
243 }
244 if (computedValue.get() == null && r1 == null) {
245 return CompletableFuture.completedFuture(null);
246 }
247 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
248 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800249 return submit(new UpdateAndGet(key,
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800250 computedValue.get(),
251 valueMatch,
252 versionMatch))
253 .whenComplete((r, e) -> throwIfLocked(r.status()))
254 .thenApply(v -> v.newValue());
255 });
256 }
257
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800258 @Override
259 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800260 if (mapEventListeners.isEmpty()) {
261 return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
262 } else {
Madan Jampani40f022e2016-03-02 21:35:14 -0800263 mapEventListeners.add(listener);
264 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800265 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800266 }
267
268 @Override
269 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
270 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800271 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800272 }
273 return CompletableFuture.completedFuture(null);
274 }
275
276 private void throwIfLocked(MapEntryUpdateResult.Status status) {
277 if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
278 throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
279 }
280 }
281
Madan Jampanicadd70b2016-02-08 13:45:43 -0800282 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800283 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800284 return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800285 }
286
287 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800288 public CompletableFuture<Void> commit(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800289 return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800290 }
291
292 @Override
Madan Jampani74da78b2016-02-09 21:18:36 -0800293 public CompletableFuture<Void> rollback(TransactionId transactionId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800294 return submit(new TransactionRollback(transactionId))
Madan Jampani74da78b2016-02-09 21:18:36 -0800295 .thenApply(v -> null);
Madan Jampanicadd70b2016-02-08 13:45:43 -0800296 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800297}