blob: 454d46c5434161c5964eeb19826c2fb47e818e5b [file] [log] [blame]
Madan Jampanidfde6ba2016-01-13 21:36:09 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
18import org.onosproject.store.service.AsyncAtomicValue;
19import org.onosproject.store.service.AsyncConsistentMap;
20import org.onosproject.store.service.AtomicValueEvent;
21import org.onosproject.store.service.AtomicValueEventListener;
22import org.onosproject.store.service.MapEvent;
23import org.onosproject.store.service.MapEventListener;
24import org.onosproject.store.service.Versioned;
25
26import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.CopyOnWriteArraySet;
29
30/**
31 * Default implementation of {@link AsyncAtomicValue}.
32 *
33 * @param <V> value type
34 */
35public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
36
37 private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
38 private final AsyncConsistentMap<String, V> valueMap;
39 private final String name;
40 private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
41 private final MeteringAgent monitor;
42
43 private static final String COMPONENT_NAME = "atomicValue";
44 private static final String GET = "get";
45 private static final String GET_AND_SET = "getAndSet";
46 private static final String SET = "set";
47 private static final String COMPARE_AND_SET = "compareAndSet";
48
49 public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
50 String name,
51 boolean meteringEnabled) {
52 this.valueMap = valueMap;
53 this.name = name;
54 this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
55 }
56
57 @Override
58 public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
59 final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
60 CompletableFuture<Boolean> response;
61 if (expect == null) {
62 if (update == null) {
63 response = CompletableFuture.completedFuture(true);
64 }
65 response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
66 } else {
67 response = update == null
68 ? valueMap.remove(name, expect)
69 : valueMap.replace(name, expect, update);
70 }
71 return response.whenComplete((r, e) -> newTimer.stop(null));
72 }
73
74 @Override
75 public CompletableFuture<V> get() {
76 final MeteringAgent.Context newTimer = monitor.startTimer(GET);
77 return valueMap.get(name)
78 .thenApply(Versioned::valueOrNull)
79 .whenComplete((r, e) -> newTimer.stop(null));
80 }
81
82 @Override
83 public CompletableFuture<V> getAndSet(V value) {
84 final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
85 CompletableFuture<Versioned<V>> previousValue = value == null ?
86 valueMap.remove(name) : valueMap.put(name, value);
87 return previousValue.thenApply(Versioned::valueOrNull)
88 .whenComplete((r, e) -> newTimer.stop(null));
89 }
90
91 @Override
92 public CompletableFuture<Void> set(V value) {
93 final MeteringAgent.Context newTimer = monitor.startTimer(SET);
94 CompletableFuture<Void> previousValue = value == null ?
95 valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
96 return previousValue.whenComplete((r, e) -> newTimer.stop(null));
97 }
98
99 @Override
100 public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
101 synchronized (listeners) {
102 if (listeners.add(listener)) {
103 if (listeners.size() == 1) {
104 return valueMap.addListener(mapEventListener);
105 }
106 }
107 }
108 return CompletableFuture.completedFuture(null);
109 }
110
111 @Override
112 public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
113 synchronized (listeners) {
114 if (listeners.remove(listener)) {
115 if (listeners.size() == 0) {
116 return valueMap.removeListener(mapEventListener);
117 }
118 }
119 }
120 return CompletableFuture.completedFuture(null);
121 }
122
123 private class InternalMapEventListener implements MapEventListener<String, V> {
124
125 @Override
126 public void event(MapEvent<String, V> mapEvent) {
127 V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
128 AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
129 listeners.forEach(l -> l.event(atomicValueEvent));
130 }
131 }
132}