blob: d79c618e11daf4f93bc8464fb39b21fa8f9c3047 [file] [log] [blame]
Madan Jampania090a112016-01-18 16:38:17 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampania090a112016-01-18 16:38:17 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampania090a112016-01-18 16:38:17 -080017
18import java.util.Collection;
19import java.util.Map;
20import java.util.Objects;
21import java.util.Set;
22import java.util.concurrent.CompletableFuture;
23import java.util.stream.Collectors;
24
25import org.onlab.util.Tools;
26import org.onosproject.store.service.AsyncConsistentMap;
27import org.onosproject.store.service.AsyncDistributedSet;
28import org.onosproject.store.service.MapEvent;
29import org.onosproject.store.service.MapEventListener;
30import org.onosproject.store.service.SetEvent;
31import org.onosproject.store.service.SetEventListener;
32
33import com.google.common.collect.ImmutableSet;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080036import org.onosproject.utils.MeteringAgent;
Madan Jampania090a112016-01-18 16:38:17 -080037
38/**
39 * Implementation of {@link AsyncDistributedSet}.
40 *
41 * @param <E> set entry type
42 */
43public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
44
45 private static final String CONTAINS = "contains";
46 private static final String PRIMITIVE_NAME = "distributedSet";
47 private static final String SIZE = "size";
48 private static final String IS_EMPTY = "isEmpty";
49 private static final String ADD = "add";
50 private static final String REMOVE = "remove";
51 private static final String CONTAINS_ALL = "containsAll";
52 private static final String ADD_ALL = "addAll";
53 private static final String RETAIN_ALL = "retainAll";
54 private static final String REMOVE_ALL = "removeAll";
55 private static final String CLEAR = "clear";
56 private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
57
58 private final String name;
59 private final AsyncConsistentMap<E, Boolean> backingMap;
60 private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
61 private final MeteringAgent monitor;
62
63 public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
64 this.backingMap = backingMap;
65 this.name = name;
66 monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
67 }
68
69 @Override
70 public String name() {
71 return name;
72 }
73
74 @Override
75 public CompletableFuture<Integer> size() {
76 final MeteringAgent.Context timer = monitor.startTimer(SIZE);
77 return backingMap.size().whenComplete((r, e) -> timer.stop(null));
78 }
79
80 @Override
81 public CompletableFuture<Boolean> isEmpty() {
82 final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
83 return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
84 }
85
86 @Override
87 public CompletableFuture<Boolean> contains(E element) {
88 final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
89 return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
90 }
91
92 @Override
93 public CompletableFuture<Boolean> add(E entry) {
94 final MeteringAgent.Context timer = monitor.startTimer(ADD);
95 return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
96 }
97
98 @Override
99 public CompletableFuture<Boolean> remove(E entry) {
100 final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
101 return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
102 }
103
104 @Override
105 public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
106 final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
107 return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
108 v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
109 }
110
111 @Override
112 public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
113 final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
114 return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
115 v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
116 }
117
118 @Override
119 public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
120 final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
121 return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
122 .thenCompose(this::removeAll)
123 .whenComplete((r, e) -> timer.stop(null));
124 }
125
126 @Override
127 public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
128 final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
129 return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
130 v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
131 }
132
133 @Override
134 public CompletableFuture<Void> clear() {
135 final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
136 return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
137 }
138
139 @Override
140 public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
141 final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
142 return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
143 }
144
145 @Override
146 public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
147 MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
148 if (mapEvent.type() == MapEvent.Type.INSERT) {
149 listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
150 } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
151 listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
152 }
153 };
154 if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
155 return backingMap.addListener(mapEventListener);
156 }
157 return CompletableFuture.completedFuture(null);
158 }
159
160 @Override
161 public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
162 MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
163 if (mapEventListener != null) {
164 return backingMap.removeListener(mapEventListener);
165 }
166 return CompletableFuture.completedFuture(null);
167 }
168}