blob: aaa44aa0d9204c34f83307d52a4a56beeb3038e0 [file] [log] [blame]
Madan Jampania090a112016-01-18 16:38:17 -08001/*
2 * Copyright 2015-2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampania090a112016-01-18 16:38:17 -080017
18import java.util.Collection;
19import java.util.Map;
20import java.util.Objects;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.stream.Collectors;
24
25import org.onlab.util.Tools;
26import org.onosproject.store.service.AsyncConsistentMap;
27import org.onosproject.store.service.AsyncDistributedSet;
28import org.onosproject.store.service.MapEvent;
29import org.onosproject.store.service.MapEventListener;
30import org.onosproject.store.service.SetEvent;
31import org.onosproject.store.service.SetEventListener;
32
33import com.google.common.collect.ImmutableSet;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
36
37/**
38 * Implementation of {@link AsyncDistributedSet}.
39 *
40 * @param <E> set entry type
41 */
42public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
43
44 private static final String CONTAINS = "contains";
45 private static final String PRIMITIVE_NAME = "distributedSet";
46 private static final String SIZE = "size";
47 private static final String IS_EMPTY = "isEmpty";
48 private static final String ADD = "add";
49 private static final String REMOVE = "remove";
50 private static final String CONTAINS_ALL = "containsAll";
51 private static final String ADD_ALL = "addAll";
52 private static final String RETAIN_ALL = "retainAll";
53 private static final String REMOVE_ALL = "removeAll";
54 private static final String CLEAR = "clear";
55 private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
56
57 private final String name;
58 private final AsyncConsistentMap<E, Boolean> backingMap;
59 private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
60 private final MeteringAgent monitor;
61
62 public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
63 this.backingMap = backingMap;
64 this.name = name;
65 monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
66 }
67
68 @Override
69 public String name() {
70 return name;
71 }
72
73 @Override
74 public CompletableFuture<Integer> size() {
75 final MeteringAgent.Context timer = monitor.startTimer(SIZE);
76 return backingMap.size().whenComplete((r, e) -> timer.stop(null));
77 }
78
79 @Override
80 public CompletableFuture<Boolean> isEmpty() {
81 final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
82 return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
83 }
84
85 @Override
86 public CompletableFuture<Boolean> contains(E element) {
87 final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
88 return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
89 }
90
91 @Override
92 public CompletableFuture<Boolean> add(E entry) {
93 final MeteringAgent.Context timer = monitor.startTimer(ADD);
94 return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
95 }
96
97 @Override
98 public CompletableFuture<Boolean> remove(E entry) {
99 final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
100 return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
101 }
102
103 @Override
104 public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
105 final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
106 return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
107 v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
108 }
109
110 @Override
111 public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
112 final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
113 return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
114 v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
115 }
116
117 @Override
118 public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
119 final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
120 return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
121 .thenCompose(this::removeAll)
122 .whenComplete((r, e) -> timer.stop(null));
123 }
124
125 @Override
126 public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
127 final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
128 return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
129 v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
130 }
131
132 @Override
133 public CompletableFuture<Void> clear() {
134 final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
135 return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
136 }
137
138 @Override
139 public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
140 final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
141 return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
142 }
143
144 @Override
145 public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
146 MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
147 if (mapEvent.type() == MapEvent.Type.INSERT) {
148 listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
149 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
150 listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
151 }
152 };
153 if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
154 return backingMap.addListener(mapEventListener);
155 }
156 return CompletableFuture.completedFuture(null);
157 }
158
159 @Override
160 public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
161 MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
162 if (mapEventListener != null) {
163 return backingMap.removeListener(mapEventListener);
164 }
165 return CompletableFuture.completedFuture(null);
166 }
167}