blob: c22356521010d42814a4a201050455df54fa4160 [file] [log] [blame]
Madan Jampanidfde6ba2016-01-13 21:36:09 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import org.onosproject.store.service.AsyncAtomicValue;
19import org.onosproject.store.service.AsyncConsistentMap;
20import org.onosproject.store.service.AtomicValueEvent;
21import org.onosproject.store.service.AtomicValueEventListener;
22import org.onosproject.store.service.MapEvent;
23import org.onosproject.store.service.MapEventListener;
24import org.onosproject.store.service.Versioned;
25
26import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.CopyOnWriteArraySet;
29
30/**
31 * Default implementation of {@link AsyncAtomicValue}.
32 *
33 * @param <V> value type
34 */
35public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
36
37 private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
38 private final AsyncConsistentMap<String, V> valueMap;
39 private final String name;
40 private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
41 private final MeteringAgent monitor;
42
43 private static final String COMPONENT_NAME = "atomicValue";
44 private static final String GET = "get";
45 private static final String GET_AND_SET = "getAndSet";
46 private static final String SET = "set";
47 private static final String COMPARE_AND_SET = "compareAndSet";
48
49 public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
50 String name,
51 boolean meteringEnabled) {
52 this.valueMap = valueMap;
53 this.name = name;
54 this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
55 }
56
57 @Override
Madan Jampania090a112016-01-18 16:38:17 -080058 public String name() {
59 return name;
60 }
61
62 @Override
Madan Jampanidfde6ba2016-01-13 21:36:09 -080063 public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
64 final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
65 CompletableFuture<Boolean> response;
66 if (expect == null) {
67 if (update == null) {
68 response = CompletableFuture.completedFuture(true);
69 }
70 response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
71 } else {
72 response = update == null
73 ? valueMap.remove(name, expect)
74 : valueMap.replace(name, expect, update);
75 }
76 return response.whenComplete((r, e) -> newTimer.stop(null));
77 }
78
79 @Override
80 public CompletableFuture<V> get() {
81 final MeteringAgent.Context newTimer = monitor.startTimer(GET);
82 return valueMap.get(name)
83 .thenApply(Versioned::valueOrNull)
84 .whenComplete((r, e) -> newTimer.stop(null));
85 }
86
87 @Override
88 public CompletableFuture<V> getAndSet(V value) {
89 final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
90 CompletableFuture<Versioned<V>> previousValue = value == null ?
91 valueMap.remove(name) : valueMap.put(name, value);
92 return previousValue.thenApply(Versioned::valueOrNull)
93 .whenComplete((r, e) -> newTimer.stop(null));
94 }
95
96 @Override
97 public CompletableFuture<Void> set(V value) {
98 final MeteringAgent.Context newTimer = monitor.startTimer(SET);
99 CompletableFuture<Void> previousValue = value == null ?
100 valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
101 return previousValue.whenComplete((r, e) -> newTimer.stop(null));
102 }
103
104 @Override
105 public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
106 synchronized (listeners) {
107 if (listeners.add(listener)) {
108 if (listeners.size() == 1) {
109 return valueMap.addListener(mapEventListener);
110 }
111 }
112 }
113 return CompletableFuture.completedFuture(null);
114 }
115
116 @Override
117 public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
118 synchronized (listeners) {
119 if (listeners.remove(listener)) {
120 if (listeners.size() == 0) {
121 return valueMap.removeListener(mapEventListener);
122 }
123 }
124 }
125 return CompletableFuture.completedFuture(null);
126 }
127
128 private class InternalMapEventListener implements MapEventListener<String, V> {
129
130 @Override
131 public void event(MapEvent<String, V> mapEvent) {
132 V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
133 AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
134 listeners.forEach(l -> l.event(atomicValueEvent));
135 }
136 }
137}