blob: 3fbdd2fbfa0b651db8707ded88d313d62295093b [file] [log] [blame]
Madan Jampani7c521002015-03-23 12:23:01 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.consistent.impl;
18
19import static com.google.common.base.Preconditions.*;
Madan Jampani50589ac2015-06-08 11:38:46 -070020import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani7c521002015-03-23 12:23:01 -070021
22import java.util.Collection;
23import java.util.Map;
24import java.util.Map.Entry;
Madan Jampani346d4f52015-05-04 11:09:39 -070025import java.util.Objects;
Madan Jampani7c521002015-03-23 12:23:01 -070026import java.util.concurrent.CompletableFuture;
Madan Jampani50589ac2015-06-08 11:38:46 -070027import java.util.concurrent.CopyOnWriteArraySet;
Flavio Castro5e1f1292015-07-23 17:14:09 -070028import java.util.concurrent.TimeUnit;
Madan Jampani346d4f52015-05-04 11:09:39 -070029import java.util.concurrent.atomic.AtomicReference;
30import java.util.function.BiFunction;
Madan Jampani50589ac2015-06-08 11:38:46 -070031import java.util.function.Consumer;
Madan Jampani346d4f52015-05-04 11:09:39 -070032import java.util.function.Function;
33import java.util.function.Predicate;
Madan Jampani7c521002015-03-23 12:23:01 -070034import java.util.stream.Collectors;
35import java.util.Set;
36
Flavio Castro5e1f1292015-07-23 17:14:09 -070037import com.codahale.metrics.Timer;
38import org.onlab.metrics.MetricsComponent;
39import org.onlab.metrics.MetricsFeature;
40import org.onlab.metrics.MetricsService;
41import org.onlab.osgi.DefaultServiceDirectory;
Madan Jampani7c521002015-03-23 12:23:01 -070042import org.onlab.util.HexString;
Madan Jampani648451f2015-07-21 22:09:05 -070043import org.onlab.util.SharedExecutors;
Madan Jampani346d4f52015-05-04 11:09:39 -070044import org.onlab.util.Tools;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070045import org.onosproject.core.ApplicationId;
Madan Jampani648451f2015-07-21 22:09:05 -070046
47import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
48
Madan Jampani7c521002015-03-23 12:23:01 -070049import org.onosproject.store.service.AsyncConsistentMap;
Madan Jampanibff6d8f2015-03-31 16:53:47 -070050import org.onosproject.store.service.ConsistentMapException;
Madan Jampani50589ac2015-06-08 11:38:46 -070051import org.onosproject.store.service.MapEvent;
52import org.onosproject.store.service.MapEventListener;
Madan Jampani7c521002015-03-23 12:23:01 -070053import org.onosproject.store.service.Serializer;
54import org.onosproject.store.service.Versioned;
Madan Jampani50589ac2015-06-08 11:38:46 -070055import org.slf4j.Logger;
Madan Jampani7c521002015-03-23 12:23:01 -070056
57import com.google.common.cache.CacheBuilder;
58import com.google.common.cache.CacheLoader;
59import com.google.common.cache.LoadingCache;
Madan Jampani7804c992015-07-20 13:20:19 -070060import com.google.common.collect.Maps;
Madan Jampani7c521002015-03-23 12:23:01 -070061
62/**
63 * AsyncConsistentMap implementation that is backed by a Raft consensus
64 * based database.
65 *
66 * @param <K> type of key.
67 * @param <V> type of value.
68 */
69public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
70
71 private final String name;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070072 private final ApplicationId applicationId;
Madan Jampanif1b8e172015-03-23 11:42:02 -070073 private final Database database;
Madan Jampani7c521002015-03-23 12:23:01 -070074 private final Serializer serializer;
Madan Jampani02b7fb82015-05-01 13:01:20 -070075 private final boolean readOnly;
Madan Jampanie8af1cc2015-06-23 14:23:31 -070076 private final boolean purgeOnUninstall;
Madan Jampani50589ac2015-06-08 11:38:46 -070077 private final Consumer<MapEvent<K, V>> eventPublisher;
78
Flavio Castro5e1f1292015-07-23 17:14:09 -070079 private final MetricsService metricsService;
80 private final MetricsComponent metricsComponent;
81 private final MetricsFeature metricsFeature;
82 private final Map<String, Timer> perMapOpTimers = Maps.newConcurrentMap();
83 private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
84 private final Timer cMapTimer;
85 private final Timer perMapTimer;
86 private final MetricsFeature wildcard;
87
88 private static final String COMPONENT_NAME = "consistentMap";
89 private static final String SIZE = "size";
90 private static final String IS_EMPTY = "isEmpty";
91 private static final String CONTAINS_KEY = "containsKey";
92 private static final String CONTAINS_VALUE = "containsValue";
93 private static final String GET = "get";
94 private static final String COMPUTE_IF = "computeIf";
95 private static final String PUT = "put";
96 private static final String PUT_AND_GET = "putAndGet";
97 private static final String PUT_IF_ABSENT = "putIfAbsent";
98 private static final String REMOVE = "remove";
99 private static final String CLEAR = "clear";
100 private static final String KEY_SET = "keySet";
101 private static final String VALUES = "values";
102 private static final String ENTRY_SET = "entrySet";
103 private static final String REPLACE = "replace";
104 private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
105
106
Madan Jampani50589ac2015-06-08 11:38:46 -0700107 private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
108
109 private final Logger log = getLogger(getClass());
Madan Jampani7c521002015-03-23 12:23:01 -0700110
111 private static final String ERROR_NULL_KEY = "Key cannot be null";
112 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
113
114 private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
115 .softValues()
116 .build(new CacheLoader<K, String>() {
117
118 @Override
119 public String load(K key) {
120 return HexString.toHexString(serializer.encode(key));
121 }
122 });
123
124 protected K dK(String key) {
125 return serializer.decode(HexString.fromHexString(key));
126 }
127
128 public DefaultAsyncConsistentMap(String name,
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700129 ApplicationId applicationId,
Madan Jampanif1b8e172015-03-23 11:42:02 -0700130 Database database,
Madan Jampani02b7fb82015-05-01 13:01:20 -0700131 Serializer serializer,
Madan Jampani50589ac2015-06-08 11:38:46 -0700132 boolean readOnly,
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700133 boolean purgeOnUninstall,
Madan Jampani50589ac2015-06-08 11:38:46 -0700134 Consumer<MapEvent<K, V>> eventPublisher) {
Madan Jampani7c521002015-03-23 12:23:01 -0700135 this.name = checkNotNull(name, "map name cannot be null");
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700136 this.applicationId = applicationId;
Madan Jampanif1b8e172015-03-23 11:42:02 -0700137 this.database = checkNotNull(database, "database cannot be null");
Madan Jampani7c521002015-03-23 12:23:01 -0700138 this.serializer = checkNotNull(serializer, "serializer cannot be null");
Madan Jampani02b7fb82015-05-01 13:01:20 -0700139 this.readOnly = readOnly;
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700140 this.purgeOnUninstall = purgeOnUninstall;
Madan Jampani50589ac2015-06-08 11:38:46 -0700141 this.eventPublisher = eventPublisher;
Madan Jampani648451f2015-07-21 22:09:05 -0700142 this.database.registerConsumer(update -> {
143 SharedExecutors.getSingleThreadExecutor().execute(() -> {
144 if (update.target() == MAP) {
145 Result<UpdateResult<String, byte[]>> result = update.output();
146 if (result.success() && result.value().mapName().equals(name)) {
147 MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
148 notifyLocalListeners(mapEvent);
149 }
150 }
151 });
152 });
Flavio Castro5e1f1292015-07-23 17:14:09 -0700153 this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
154 this.metricsComponent = metricsService.registerComponent(COMPONENT_NAME);
155 this.metricsFeature = metricsComponent.registerFeature(name);
156 this.wildcard = metricsComponent.registerFeature("*");
157 this.perMapTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
158 this.cMapTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
159
Madan Jampani50589ac2015-06-08 11:38:46 -0700160 }
161
162 /**
163 * Returns this map name.
164 * @return map name
165 */
166 public String name() {
167 return name;
168 }
169
170 /**
171 * Returns the serializer for map entries.
172 * @return map entry serializer
173 */
174 public Serializer serializer() {
175 return serializer;
Madan Jampani7c521002015-03-23 12:23:01 -0700176 }
177
Madan Jampanie8af1cc2015-06-23 14:23:31 -0700178 /**
179 * Returns the applicationId owning this map.
180 * @return application Id
181 */
182 public ApplicationId applicationId() {
183 return applicationId;
184 }
185
186 /**
187 * Returns whether the map entries should be purged when the application
188 * owning it is uninstalled.
189 * @return true is map needs to cleared on app uninstall; false otherwise
190 */
191 public boolean purgeOnUninstall() {
192 return purgeOnUninstall;
193 }
194
Madan Jampani7c521002015-03-23 12:23:01 -0700195 @Override
196 public CompletableFuture<Integer> size() {
Flavio Castro5e1f1292015-07-23 17:14:09 -0700197 final OperationTimer timer = startTimer(SIZE);
198 return database.mapSize(name)
199 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700200 }
201
202 @Override
203 public CompletableFuture<Boolean> isEmpty() {
Flavio Castro5e1f1292015-07-23 17:14:09 -0700204 final OperationTimer timer = startTimer(IS_EMPTY);
205 return database.mapIsEmpty(name)
206 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700207 }
208
209 @Override
210 public CompletableFuture<Boolean> containsKey(K key) {
211 checkNotNull(key, ERROR_NULL_KEY);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700212 final OperationTimer timer = startTimer(CONTAINS_KEY);
213 return database.mapContainsKey(name, keyCache.getUnchecked(key))
214 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700215 }
216
217 @Override
218 public CompletableFuture<Boolean> containsValue(V value) {
219 checkNotNull(value, ERROR_NULL_VALUE);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700220 final OperationTimer timer = startTimer(CONTAINS_VALUE);
221 return database.mapContainsValue(name, serializer.encode(value))
222 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700223 }
224
225 @Override
226 public CompletableFuture<Versioned<V>> get(K key) {
227 checkNotNull(key, ERROR_NULL_KEY);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700228 final OperationTimer timer = startTimer(GET);
Madan Jampani7804c992015-07-20 13:20:19 -0700229 return database.mapGet(name, keyCache.getUnchecked(key))
Flavio Castro5e1f1292015-07-23 17:14:09 -0700230 .whenComplete((r, e) -> timer.stop())
231 .thenApply(v -> v != null ? v.map(serializer::decode) : null);
Madan Jampani7c521002015-03-23 12:23:01 -0700232 }
233
234 @Override
Madan Jampani346d4f52015-05-04 11:09:39 -0700235 public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
236 Function<? super K, ? extends V> mappingFunction) {
Madan Jampani7804c992015-07-20 13:20:19 -0700237 checkNotNull(key, ERROR_NULL_KEY);
238 checkNotNull(mappingFunction, "Mapping function cannot be null");
Flavio Castro5e1f1292015-07-23 17:14:09 -0700239 final OperationTimer timer = startTimer(COMPUTE_IF_ABSENT);
240 return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
241 .whenComplete((r, e) -> timer.stop())
242 .thenApply(v -> v.newValue());
Madan Jampani346d4f52015-05-04 11:09:39 -0700243 }
244
245 @Override
246 public CompletableFuture<Versioned<V>> computeIfPresent(K key,
247 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
248 return computeIf(key, Objects::nonNull, remappingFunction);
249 }
250
251 @Override
252 public CompletableFuture<Versioned<V>> compute(K key,
253 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
254 return computeIf(key, v -> true, remappingFunction);
255 }
256
257 @Override
258 public CompletableFuture<Versioned<V>> computeIf(K key,
259 Predicate<? super V> condition,
260 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
261 checkNotNull(key, ERROR_NULL_KEY);
262 checkNotNull(condition, "predicate function cannot be null");
263 checkNotNull(remappingFunction, "Remapping function cannot be null");
Flavio Castro5e1f1292015-07-23 17:14:09 -0700264 final OperationTimer timer = startTimer(COMPUTE_IF);
Madan Jampani346d4f52015-05-04 11:09:39 -0700265 return get(key).thenCompose(r1 -> {
266 V existingValue = r1 == null ? null : r1.value();
267 // if the condition evaluates to false, return existing value.
268 if (!condition.test(existingValue)) {
269 return CompletableFuture.completedFuture(r1);
270 }
271
272 AtomicReference<V> computedValue = new AtomicReference<>();
273 // if remappingFunction throws an exception, return the exception.
274 try {
275 computedValue.set(remappingFunction.apply(key, existingValue));
276 } catch (Exception e) {
277 return Tools.exceptionalFuture(e);
278 }
Madan Jampani7804c992015-07-20 13:20:19 -0700279 if (computedValue.get() == null && r1 == null) {
280 return CompletableFuture.completedFuture(null);
Madan Jampani346d4f52015-05-04 11:09:39 -0700281 }
Madan Jampani7804c992015-07-20 13:20:19 -0700282 Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
283 Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
284 return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
Flavio Castro5e1f1292015-07-23 17:14:09 -0700285 .whenComplete((r, e) -> timer.stop())
Madan Jampani7804c992015-07-20 13:20:19 -0700286 .thenApply(v -> {
287 if (v.updated()) {
288 return v.newValue();
289 } else {
290 throw new ConsistentMapException.ConcurrentModification();
291 }
292 });
293 });
Madan Jampani346d4f52015-05-04 11:09:39 -0700294 }
295
296 @Override
Madan Jampani7c521002015-03-23 12:23:01 -0700297 public CompletableFuture<Versioned<V>> put(K key, V value) {
298 checkNotNull(key, ERROR_NULL_KEY);
299 checkNotNull(value, ERROR_NULL_VALUE);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700300 final OperationTimer timer = startTimer(PUT);
301 return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
302 .whenComplete((r, e) -> timer.stop());
Madan Jampani346d4f52015-05-04 11:09:39 -0700303 }
304
305 @Override
306 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
307 checkNotNull(key, ERROR_NULL_KEY);
308 checkNotNull(value, ERROR_NULL_VALUE);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700309 final OperationTimer timer = startTimer(PUT_AND_GET);
310 return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
311 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700312 }
313
314 @Override
315 public CompletableFuture<Versioned<V>> remove(K key) {
316 checkNotNull(key, ERROR_NULL_KEY);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700317 final OperationTimer timer = startTimer(REMOVE);
318 return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
319 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700320 }
321
322 @Override
323 public CompletableFuture<Void> clear() {
Madan Jampani02b7fb82015-05-01 13:01:20 -0700324 checkIfUnmodifiable();
Flavio Castro5e1f1292015-07-23 17:14:09 -0700325 final OperationTimer timer = startTimer(CLEAR);
326 return database.mapClear(name).thenApply(this::unwrapResult)
327 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700328 }
329
330 @Override
331 public CompletableFuture<Set<K>> keySet() {
Flavio Castro5e1f1292015-07-23 17:14:09 -0700332 final OperationTimer timer = startTimer(KEY_SET);
Madan Jampani7804c992015-07-20 13:20:19 -0700333 return database.mapKeySet(name)
Madan Jampani7c521002015-03-23 12:23:01 -0700334 .thenApply(s -> s
Flavio Castro5e1f1292015-07-23 17:14:09 -0700335 .stream()
336 .map(this::dK)
337 .collect(Collectors.toSet()))
338 .whenComplete((r, e) -> timer.stop());
Madan Jampani7c521002015-03-23 12:23:01 -0700339 }
340
341 @Override
342 public CompletableFuture<Collection<Versioned<V>>> values() {
Flavio Castro5e1f1292015-07-23 17:14:09 -0700343 final OperationTimer timer = startTimer(VALUES);
344 return database.mapValues(name)
345 .whenComplete((r, e) -> timer.stop())
346 .thenApply(c -> c
347 .stream()
348 .map(v -> v.<V>map(serializer::decode))
349 .collect(Collectors.toList()));
Madan Jampani7c521002015-03-23 12:23:01 -0700350 }
351
352 @Override
353 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
Flavio Castro5e1f1292015-07-23 17:14:09 -0700354 final OperationTimer timer = startTimer(ENTRY_SET);
355 return database.mapEntrySet(name)
356 .whenComplete((r, e) -> timer.stop())
357 .thenApply(s -> s
358 .stream()
359 .map(this::mapRawEntry)
360 .collect(Collectors.toSet()));
Madan Jampani7c521002015-03-23 12:23:01 -0700361 }
362
363 @Override
364 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
365 checkNotNull(key, ERROR_NULL_KEY);
366 checkNotNull(value, ERROR_NULL_VALUE);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700367 final OperationTimer timer = startTimer(PUT_IF_ABSENT);
368 return updateAndGet(key, Match.ifNull(), Match.any(), value)
369 .whenComplete((r, e) -> timer.stop())
370 .thenApply(v -> v.oldValue());
Madan Jampani7c521002015-03-23 12:23:01 -0700371 }
372
373 @Override
374 public CompletableFuture<Boolean> remove(K key, V value) {
375 checkNotNull(key, ERROR_NULL_KEY);
376 checkNotNull(value, ERROR_NULL_VALUE);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700377 final OperationTimer timer = startTimer(REMOVE);
378 return updateAndGet(key, Match.ifValue(value), Match.any(), null)
379 .whenComplete((r, e) -> timer.stop())
380 .thenApply(v -> v.updated());
Madan Jampani7c521002015-03-23 12:23:01 -0700381 }
382
383 @Override
384 public CompletableFuture<Boolean> remove(K key, long version) {
385 checkNotNull(key, ERROR_NULL_KEY);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700386 final OperationTimer timer = startTimer(REMOVE);
387 return updateAndGet(key, Match.any(), Match.ifValue(version), null)
388 .whenComplete((r, e) -> timer.stop())
389 .thenApply(v -> v.updated());
Madan Jampani7c521002015-03-23 12:23:01 -0700390 }
391
392 @Override
393 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
394 checkNotNull(key, ERROR_NULL_KEY);
Madan Jampani7804c992015-07-20 13:20:19 -0700395 checkNotNull(oldValue, ERROR_NULL_VALUE);
Madan Jampani7c521002015-03-23 12:23:01 -0700396 checkNotNull(newValue, ERROR_NULL_VALUE);
Flavio Castro5e1f1292015-07-23 17:14:09 -0700397 final OperationTimer timer = startTimer(REPLACE);
398 return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
399 .whenComplete((r, e) -> timer.stop())
400 .thenApply(v -> v.updated());
Madan Jampani7c521002015-03-23 12:23:01 -0700401 }
402
403 @Override
404 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
Flavio Castro5e1f1292015-07-23 17:14:09 -0700405 final OperationTimer timer = startTimer(REPLACE);
406 return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
407 .whenComplete((r, e) -> timer.stop())
408 .thenApply(v -> v.updated());
Madan Jampani346d4f52015-05-04 11:09:39 -0700409 }
410
Madan Jampani7804c992015-07-20 13:20:19 -0700411 private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
412 return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
413 }
414
415 private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
416 Match<V> oldValueMatch,
417 Match<Long> oldVersionMatch,
418 V value) {
Madan Jampani02b7fb82015-05-01 13:01:20 -0700419 checkIfUnmodifiable();
Madan Jampani7804c992015-07-20 13:20:19 -0700420 return database.mapUpdate(name,
Flavio Castro5e1f1292015-07-23 17:14:09 -0700421 keyCache.getUnchecked(key),
422 oldValueMatch.map(serializer::encode),
423 oldVersionMatch,
424 value == null ? null : serializer.encode(value))
Madan Jampani7804c992015-07-20 13:20:19 -0700425 .thenApply(this::unwrapResult)
426 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
Madan Jampani648451f2015-07-21 22:09:05 -0700427 .whenComplete((r, e) -> {
428 if (r != null && e == null && !database.hasChangeNotificationSupport()) {
429 notifyListeners(r.toMapEvent());
430 }
431 });
Madan Jampani7c521002015-03-23 12:23:01 -0700432 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700433
434 private <T> T unwrapResult(Result<T> result) {
435 if (result.status() == Result.Status.LOCKED) {
436 throw new ConsistentMapException.ConcurrentModification();
437 } else if (result.success()) {
438 return result.value();
439 } else {
440 throw new IllegalStateException("Must not be here");
441 }
442 }
Madan Jampani02b7fb82015-05-01 13:01:20 -0700443
444 private void checkIfUnmodifiable() {
445 if (readOnly) {
446 throw new UnsupportedOperationException();
447 }
448 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700449
450 @Override
451 public void addListener(MapEventListener<K, V> listener) {
452 listeners.add(listener);
453 }
454
455 @Override
456 public void removeListener(MapEventListener<K, V> listener) {
457 listeners.remove(listener);
458 }
459
460 protected void notifyListeners(MapEvent<K, V> event) {
461 try {
462 if (event != null) {
463 notifyLocalListeners(event);
464 notifyRemoteListeners(event);
465 }
466 } catch (Exception e) {
467 log.warn("Failure notifying listeners about {}", event, e);
468 }
469 }
470
471 protected void notifyLocalListeners(MapEvent<K, V> event) {
Madan Jampani648451f2015-07-21 22:09:05 -0700472 if (event != null) {
473 listeners.forEach(listener -> listener.event(event));
474 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700475 }
476
477 protected void notifyRemoteListeners(MapEvent<K, V> event) {
478 if (eventPublisher != null) {
479 eventPublisher.accept(event);
480 }
481 }
Flavio Castro5e1f1292015-07-23 17:14:09 -0700482
483 private OperationTimer startTimer(String op) {
484 //check if timer exist, if it doesn't creates it
485 final Timer currTimer = perMapOpTimers.computeIfAbsent(op, timer ->
486 metricsService.createTimer(metricsComponent, metricsFeature, op));
487 perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
488 //starts timer
489 return new OperationTimer(currTimer.time(), op);
490 }
491
492 private class OperationTimer {
493 private final Timer.Context context;
494 private final String operation;
495
496 public OperationTimer(Timer.Context context, String operation) {
497 this.context = context;
498 this.operation = operation;
499 }
500
501 public void stop() {
502 //Stop and updates timer with specific measurements per map, per operation
503 final long time = context.stop();
504 //updates timer with aggregated measurements per map
505 perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
506 //updates timer with aggregated measurements per map
507 perMapTimer.update(time, TimeUnit.NANOSECONDS);
508 //updates timer with aggregated measurements per all Consistent Maps
509 cMapTimer.update(time, TimeUnit.NANOSECONDS);
510 }
511 }
Madan Jampani7c521002015-03-23 12:23:01 -0700512}