blob: a78a44aebc038173d8a4639e00426ff6b696fe47 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
Madan Jampanif2f086c2016-01-13 16:15:39 -080023
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070024import net.kuujo.copycat.Task;
25import net.kuujo.copycat.cluster.Cluster;
26import net.kuujo.copycat.resource.ResourceState;
Madan Jampanif2f086c2016-01-13 16:15:39 -080027
28import org.onlab.util.Match;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070029import org.onosproject.store.service.DatabaseUpdate;
30import org.onosproject.store.service.Transaction;
31import org.onosproject.store.service.Versioned;
32
Madan Jampani94c23532015-02-05 17:40:01 -080033import java.util.Collection;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Set;
38import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.CopyOnWriteArrayList;
40import java.util.concurrent.atomic.AtomicBoolean;
41import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani648451f2015-07-21 22:09:05 -070042import java.util.function.Consumer;
Madan Jampanif1b8e172015-03-23 11:42:02 -070043import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053044
Madan Jampani7f72c3f2015-03-01 17:34:59 -080045import static com.google.common.base.Preconditions.checkState;
46
Madan Jampani94c23532015-02-05 17:40:01 -080047/**
48 * A database that partitions the keys across one or more database partitions.
49 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070050public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080051
Madan Jampanif1b8e172015-03-23 11:42:02 -070052 private final String name;
53 private final Partitioner<String> partitioner;
54 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080055 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070056 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070057 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080058
Madan Jampanif1b8e172015-03-23 11:42:02 -070059 public PartitionedDatabase(
60 String name,
61 Collection<Database> partitions) {
62 this.name = name;
63 this.partitions = partitions
64 .stream()
65 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
66 .collect(Collectors.toList());
67 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
68 }
69
70 /**
71 * Returns the databases for individual partitions.
72 * @return list of database partitions
73 */
74 public List<Database> getPartitions() {
75 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080076 }
77
Madan Jampani7f72c3f2015-03-01 17:34:59 -080078 /**
79 * Returns true if the database is open.
80 * @return true if open, false otherwise
81 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070082 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080083 public boolean isOpen() {
84 return isOpen.get();
85 }
86
Madan Jampani94c23532015-02-05 17:40:01 -080087 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070088 public CompletableFuture<Set<String>> maps() {
Madan Jampania89f8f92015-04-01 14:39:54 -070089 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -070090 Set<String> mapNames = Sets.newConcurrentHashSet();
Madan Jampania89f8f92015-04-01 14:39:54 -070091 return CompletableFuture.allOf(partitions
92 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -070093 .map(db -> db.maps().thenApply(mapNames::addAll))
Madan Jampania89f8f92015-04-01 14:39:54 -070094 .toArray(CompletableFuture[]::new))
Madan Jampani7804c992015-07-20 13:20:19 -070095 .thenApply(v -> mapNames);
Madan Jampania89f8f92015-04-01 14:39:54 -070096 }
97
98 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070099 public CompletableFuture<Map<String, Long>> counters() {
100 checkState(isOpen.get(), DB_NOT_OPEN);
101 Map<String, Long> counters = Maps.newConcurrentMap();
102 return CompletableFuture.allOf(partitions
103 .stream()
104 .map(db -> db.counters()
andreafd912ac2015-10-02 14:58:35 -0700105 .thenApply(m -> {
106 counters.putAll(m);
107 return null;
108 }))
Madan Jampanib5d72d52015-04-03 16:53:50 -0700109 .toArray(CompletableFuture[]::new))
110 .thenApply(v -> counters);
111 }
112
113 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700114 public CompletableFuture<Integer> mapSize(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800115 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800116 AtomicInteger totalSize = new AtomicInteger(0);
117 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700118 .stream()
119 .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
120 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800121 .thenApply(v -> totalSize.get());
122 }
123
124 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700125 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800126 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700127 return mapSize(mapName).thenApply(size -> size == 0);
Madan Jampani94c23532015-02-05 17:40:01 -0800128 }
129
130 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700131 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800132 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700133 return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800134 }
135
136 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700137 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800138 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800139 AtomicBoolean containsValue = new AtomicBoolean(false);
140 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700141 .stream()
142 .map(p -> p.mapContainsValue(mapName, value)
143 .thenApply(v -> containsValue.compareAndSet(false, v)))
144 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800145 .thenApply(v -> containsValue.get());
146 }
147
148 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700149 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800150 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700151 return partitioner.getPartition(mapName, key).mapGet(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800152 }
153
154 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700155 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
156 String mapName, String key, Match<byte[]> valueMatch,
157 Match<Long> versionMatch, byte[] value) {
158 return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
159
Madan Jampani94c23532015-02-05 17:40:01 -0800160 }
161
162 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700163 public CompletableFuture<Result<Void>> mapClear(String mapName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700164 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800165 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800166 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800167 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700168 .map(p -> p.mapClear(mapName)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700169 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
170 .toArray(CompletableFuture[]::new))
171 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800172 }
173
174 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700175 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800176 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800177 Set<String> keySet = Sets.newConcurrentHashSet();
178 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800179 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700180 .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800181 .toArray(CompletableFuture[]::new))
182 .thenApply(v -> keySet);
183 }
184
185 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700186 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800187 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800188 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
189 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800190 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700191 .map(p -> p.mapValues(mapName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800192 .toArray(CompletableFuture[]::new))
193 .thenApply(v -> values);
194 }
195
196 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700197 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800198 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800199 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
200 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700201 .stream()
202 .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
203 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800204 .thenApply(v -> entrySet);
205 }
206
207 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700208 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700209 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700210 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700211 }
212
213 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700214 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700215 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700216 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
217 }
218
219 @Override
220 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
221 checkState(isOpen.get(), DB_NOT_OPEN);
222 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700223 }
224
andreafd912ac2015-10-02 14:58:35 -0700225 @Override
226 public CompletableFuture<Void> counterSet(String counterName, long value) {
227 checkState(isOpen.get(), DB_NOT_OPEN);
228 return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
229 }
Madan Jampani63c659f2015-06-11 00:52:58 -0700230
231 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700232 public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
233 checkState(isOpen.get(), DB_NOT_OPEN);
234 return partitioner.getPartition(counterName, counterName).
235 counterCompareAndSet(counterName, expectedValue, updateValue);
236
237 }
238
239 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700240 public CompletableFuture<Long> queueSize(String queueName) {
241 checkState(isOpen.get(), DB_NOT_OPEN);
242 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
243 }
244
245 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700246 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700247 checkState(isOpen.get(), DB_NOT_OPEN);
248 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
249 }
250
251 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700252 public CompletableFuture<byte[]> queuePop(String queueName) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700253 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampania6d787b2015-08-11 11:02:02 -0700254 return partitioner.getPartition(queueName, queueName).queuePop(queueName);
Madan Jampani63c659f2015-06-11 00:52:58 -0700255 }
256
257 @Override
258 public CompletableFuture<byte[]> queuePeek(String queueName) {
259 checkState(isOpen.get(), DB_NOT_OPEN);
260 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
261 }
262
Madan Jampanib5d72d52015-04-03 16:53:50 -0700263 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700264 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700265 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
266 if (subTransactions.isEmpty()) {
Madan Jampanibab51a42015-08-10 13:53:35 -0700267 return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700268 } else if (subTransactions.size() == 1) {
269 Entry<Database, Transaction> entry =
270 subTransactions.entrySet().iterator().next();
271 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800272 } else {
Madan Jampani98166f92015-06-26 15:12:33 -0700273 if (transactionManager == null) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700274 throw new IllegalStateException("TransactionManager is not initialized");
275 }
276 return transactionManager.execute(transaction);
Madan Jampani94c23532015-02-05 17:40:01 -0800277 }
278 }
279
280 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700281 public CompletableFuture<Boolean> prepare(Transaction transaction) {
282 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
283 AtomicBoolean status = new AtomicBoolean(true);
284 return CompletableFuture.allOf(subTransactions.entrySet()
285 .stream()
andreafd912ac2015-10-02 14:58:35 -0700286 .map(entry -> entry
287 .getKey()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700288 .prepare(entry.getValue())
289 .thenApply(v -> status.compareAndSet(true, v)))
290 .toArray(CompletableFuture[]::new))
291 .thenApply(v -> status.get());
292 }
293
294 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700295 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700296 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
Madan Jampanibab51a42015-08-10 13:53:35 -0700297 AtomicBoolean success = new AtomicBoolean(true);
298 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700299 return CompletableFuture.allOf(subTransactions.entrySet()
andreafd912ac2015-10-02 14:58:35 -0700300 .stream()
301 .map(entry -> entry.getKey().commit(entry.getValue())
302 .thenAccept(response -> {
303 success.set(success.get() && response.success());
304 if (success.get()) {
305 allUpdates.addAll(response.updates());
306 }
307 }))
308 .toArray(CompletableFuture[]::new))
Madan Jampanibab51a42015-08-10 13:53:35 -0700309 .thenApply(v -> success.get() ?
310 CommitResponse.success(allUpdates) : CommitResponse.failure());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700311 }
312
313 @Override
314 public CompletableFuture<Boolean> rollback(Transaction transaction) {
315 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
316 return CompletableFuture.allOf(subTransactions.entrySet()
317 .stream()
318 .map(entry -> entry.getKey().rollback(entry.getValue()))
andreafd912ac2015-10-02 14:58:35 -0700319 .toArray(CompletableFuture[]::new))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700320 .thenApply(v -> true);
321 }
322
323 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700324 public CompletableFuture<Database> open() {
325 return CompletableFuture.allOf(partitions
326 .stream()
327 .map(Database::open)
328 .toArray(CompletableFuture[]::new))
329 .thenApply(v -> {
330 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700331 return this;
332 });
Madan Jampani94c23532015-02-05 17:40:01 -0800333 }
334
335 @Override
336 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800337 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700338 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800339 .stream()
340 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800341 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800342 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700343
344 @Override
345 public boolean isClosed() {
346 return !isOpen.get();
347 }
348
349 @Override
350 public String name() {
351 return name;
352 }
353
354 @Override
355 public Cluster cluster() {
356 throw new UnsupportedOperationException();
357 }
358
359 @Override
360 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
361 throw new UnsupportedOperationException();
362 }
363
364 @Override
365 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
366 throw new UnsupportedOperationException();
367 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700368
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700369 @Override
370 public ResourceState state() {
371 throw new UnsupportedOperationException();
372 }
373
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700374 private Map<Database, Transaction> createSubTransactions(
375 Transaction transaction) {
376 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
377 for (DatabaseUpdate update : transaction.updates()) {
Madan Jampani7804c992015-07-20 13:20:19 -0700378 Database partition = partitioner.getPartition(update.mapName(), update.key());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700379 List<DatabaseUpdate> partitionUpdates =
380 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
381 partitionUpdates.add(update);
382 }
383 Map<Database, Transaction> subTransactions = Maps.newHashMap();
384 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700385 return subTransactions;
386 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700387
Ayaka Koshibe94cc01b2015-06-26 15:39:11 -0700388 protected void setTransactionManager(TransactionManager transactionManager) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700389 this.transactionManager = transactionManager;
390 }
Madan Jampani648451f2015-07-21 22:09:05 -0700391
392 @Override
Madan Jampani648451f2015-07-21 22:09:05 -0700393 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700394 partitions.forEach(p -> p.registerConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700395 }
396
397 @Override
398 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700399 partitions.forEach(p -> p.unregisterConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700400 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700401}
andreafd912ac2015-10-02 14:58:35 -0700402