blob: 31e22a86d5b3ae976b413ed8c58486c2f483847c [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampanif4c88502016-01-21 12:35:36 -080017package org.onosproject.store.primitives.impl;
Madan Jampani94c23532015-02-05 17:40:01 -080018
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.collect.ImmutableList;
20import com.google.common.collect.Lists;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
Madan Jampanif2f086c2016-01-13 16:15:39 -080023
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070024import net.kuujo.copycat.Task;
25import net.kuujo.copycat.cluster.Cluster;
26import net.kuujo.copycat.resource.ResourceState;
Madan Jampanif2f086c2016-01-13 16:15:39 -080027
28import org.onlab.util.Match;
Madan Jampani74da78b2016-02-09 21:18:36 -080029import org.onosproject.store.primitives.MapUpdate;
Madan Jampanicadd70b2016-02-08 13:45:43 -080030import org.onosproject.store.primitives.resources.impl.CommitResult;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070031import org.onosproject.store.service.Versioned;
32
Madan Jampani94c23532015-02-05 17:40:01 -080033import java.util.Collection;
34import java.util.List;
35import java.util.Map;
36import java.util.Map.Entry;
37import java.util.Set;
38import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.CopyOnWriteArrayList;
40import java.util.concurrent.atomic.AtomicBoolean;
41import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani648451f2015-07-21 22:09:05 -070042import java.util.function.Consumer;
Madan Jampanif1b8e172015-03-23 11:42:02 -070043import java.util.stream.Collectors;
Madan Jampani393e0f02015-02-12 07:35:39 +053044
Madan Jampani7f72c3f2015-03-01 17:34:59 -080045import static com.google.common.base.Preconditions.checkState;
46
Madan Jampani94c23532015-02-05 17:40:01 -080047/**
48 * A database that partitions the keys across one or more database partitions.
49 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070050public class PartitionedDatabase implements Database {
Madan Jampani94c23532015-02-05 17:40:01 -080051
Madan Jampanif1b8e172015-03-23 11:42:02 -070052 private final String name;
53 private final Partitioner<String> partitioner;
54 private final List<Database> partitions;
Madan Jampani7f72c3f2015-03-01 17:34:59 -080055 private final AtomicBoolean isOpen = new AtomicBoolean(false);
Madan Jampanif1b8e172015-03-23 11:42:02 -070056 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
Madan Jampani50589ac2015-06-08 11:38:46 -070057 private TransactionManager transactionManager;
Madan Jampani94c23532015-02-05 17:40:01 -080058
Madan Jampanif1b8e172015-03-23 11:42:02 -070059 public PartitionedDatabase(
60 String name,
61 Collection<Database> partitions) {
62 this.name = name;
63 this.partitions = partitions
64 .stream()
65 .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
66 .collect(Collectors.toList());
67 this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
68 }
69
70 /**
71 * Returns the databases for individual partitions.
72 * @return list of database partitions
73 */
74 public List<Database> getPartitions() {
75 return partitions;
Madan Jampani94c23532015-02-05 17:40:01 -080076 }
77
Madan Jampani7f72c3f2015-03-01 17:34:59 -080078 /**
79 * Returns true if the database is open.
80 * @return true if open, false otherwise
81 */
Madan Jampanif1b8e172015-03-23 11:42:02 -070082 @Override
Madan Jampani7f72c3f2015-03-01 17:34:59 -080083 public boolean isOpen() {
84 return isOpen.get();
85 }
86
Madan Jampani94c23532015-02-05 17:40:01 -080087 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070088 public CompletableFuture<Set<String>> maps() {
Madan Jampania89f8f92015-04-01 14:39:54 -070089 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -070090 Set<String> mapNames = Sets.newConcurrentHashSet();
Madan Jampania89f8f92015-04-01 14:39:54 -070091 return CompletableFuture.allOf(partitions
92 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -070093 .map(db -> db.maps().thenApply(mapNames::addAll))
Madan Jampania89f8f92015-04-01 14:39:54 -070094 .toArray(CompletableFuture[]::new))
Madan Jampani7804c992015-07-20 13:20:19 -070095 .thenApply(v -> mapNames);
Madan Jampania89f8f92015-04-01 14:39:54 -070096 }
97
98 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070099 public CompletableFuture<Map<String, Long>> counters() {
100 checkState(isOpen.get(), DB_NOT_OPEN);
101 Map<String, Long> counters = Maps.newConcurrentMap();
102 return CompletableFuture.allOf(partitions
103 .stream()
104 .map(db -> db.counters()
andreafd912ac2015-10-02 14:58:35 -0700105 .thenApply(m -> {
106 counters.putAll(m);
107 return null;
108 }))
Madan Jampanib5d72d52015-04-03 16:53:50 -0700109 .toArray(CompletableFuture[]::new))
110 .thenApply(v -> counters);
111 }
112
113 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700114 public CompletableFuture<Integer> mapSize(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800115 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800116 AtomicInteger totalSize = new AtomicInteger(0);
117 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700118 .stream()
119 .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
120 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800121 .thenApply(v -> totalSize.get());
122 }
123
124 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700125 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800126 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700127 return mapSize(mapName).thenApply(size -> size == 0);
Madan Jampani94c23532015-02-05 17:40:01 -0800128 }
129
130 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700131 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800132 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700133 return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800134 }
135
136 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700137 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800138 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800139 AtomicBoolean containsValue = new AtomicBoolean(false);
140 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700141 .stream()
142 .map(p -> p.mapContainsValue(mapName, value)
143 .thenApply(v -> containsValue.compareAndSet(false, v)))
144 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800145 .thenApply(v -> containsValue.get());
146 }
147
148 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700149 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800150 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani7804c992015-07-20 13:20:19 -0700151 return partitioner.getPartition(mapName, key).mapGet(mapName, key);
Madan Jampani94c23532015-02-05 17:40:01 -0800152 }
153
154 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700155 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
156 String mapName, String key, Match<byte[]> valueMatch,
157 Match<Long> versionMatch, byte[] value) {
158 return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
159
Madan Jampani94c23532015-02-05 17:40:01 -0800160 }
161
162 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700163 public CompletableFuture<Result<Void>> mapClear(String mapName) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700164 AtomicBoolean isLocked = new AtomicBoolean(false);
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800165 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800166 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800167 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700168 .map(p -> p.mapClear(mapName)
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700169 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
170 .toArray(CompletableFuture[]::new))
171 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
Madan Jampani94c23532015-02-05 17:40:01 -0800172 }
173
174 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700175 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800176 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800177 Set<String> keySet = Sets.newConcurrentHashSet();
178 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800179 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700180 .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800181 .toArray(CompletableFuture[]::new))
182 .thenApply(v -> keySet);
183 }
184
185 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700186 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800187 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800188 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
189 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800190 .stream()
Madan Jampani7804c992015-07-20 13:20:19 -0700191 .map(p -> p.mapValues(mapName).thenApply(values::addAll))
Madan Jampani94c23532015-02-05 17:40:01 -0800192 .toArray(CompletableFuture[]::new))
193 .thenApply(v -> values);
194 }
195
196 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700197 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800198 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani94c23532015-02-05 17:40:01 -0800199 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
200 return CompletableFuture.allOf(partitions
andreafd912ac2015-10-02 14:58:35 -0700201 .stream()
202 .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
203 .toArray(CompletableFuture[]::new))
Madan Jampani94c23532015-02-05 17:40:01 -0800204 .thenApply(v -> entrySet);
205 }
206
207 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700208 public CompletableFuture<Long> counterGet(String counterName) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700209 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700210 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700211 }
212
213 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700214 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
Madan Jampanib5d72d52015-04-03 16:53:50 -0700215 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampani04aeb452015-05-02 16:12:24 -0700216 return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
217 }
218
219 @Override
220 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
221 checkState(isOpen.get(), DB_NOT_OPEN);
222 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700223 }
224
andreafd912ac2015-10-02 14:58:35 -0700225 @Override
226 public CompletableFuture<Void> counterSet(String counterName, long value) {
227 checkState(isOpen.get(), DB_NOT_OPEN);
228 return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
229 }
Madan Jampani63c659f2015-06-11 00:52:58 -0700230
231 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700232 public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
233 checkState(isOpen.get(), DB_NOT_OPEN);
234 return partitioner.getPartition(counterName, counterName).
235 counterCompareAndSet(counterName, expectedValue, updateValue);
236
237 }
238
239 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700240 public CompletableFuture<Long> queueSize(String queueName) {
241 checkState(isOpen.get(), DB_NOT_OPEN);
242 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
243 }
244
245 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700246 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700247 checkState(isOpen.get(), DB_NOT_OPEN);
248 return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
249 }
250
251 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700252 public CompletableFuture<byte[]> queuePop(String queueName) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700253 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampania6d787b2015-08-11 11:02:02 -0700254 return partitioner.getPartition(queueName, queueName).queuePop(queueName);
Madan Jampani63c659f2015-06-11 00:52:58 -0700255 }
256
257 @Override
258 public CompletableFuture<byte[]> queuePeek(String queueName) {
259 checkState(isOpen.get(), DB_NOT_OPEN);
260 return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
261 }
262
Madan Jampanib5d72d52015-04-03 16:53:50 -0700263 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700264 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700265 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
266 if (subTransactions.isEmpty()) {
Madan Jampanibab51a42015-08-10 13:53:35 -0700267 return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700268 } else if (subTransactions.size() == 1) {
269 Entry<Database, Transaction> entry =
270 subTransactions.entrySet().iterator().next();
271 return entry.getKey().prepareAndCommit(entry.getValue());
Madan Jampani94c23532015-02-05 17:40:01 -0800272 } else {
Madan Jampani98166f92015-06-26 15:12:33 -0700273 if (transactionManager == null) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700274 throw new IllegalStateException("TransactionManager is not initialized");
275 }
Madan Jampanicadd70b2016-02-08 13:45:43 -0800276 return transactionManager.execute(transaction)
277 .thenApply(r -> r == CommitResult.OK
278 ? CommitResponse.success(ImmutableList.of()) : CommitResponse.failure());
Madan Jampani94c23532015-02-05 17:40:01 -0800279 }
280 }
281
282 @Override
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700283 public CompletableFuture<Boolean> prepare(Transaction transaction) {
284 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
285 AtomicBoolean status = new AtomicBoolean(true);
286 return CompletableFuture.allOf(subTransactions.entrySet()
287 .stream()
andreafd912ac2015-10-02 14:58:35 -0700288 .map(entry -> entry
289 .getKey()
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700290 .prepare(entry.getValue())
291 .thenApply(v -> status.compareAndSet(true, v)))
292 .toArray(CompletableFuture[]::new))
293 .thenApply(v -> status.get());
294 }
295
296 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700297 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700298 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
Madan Jampanibab51a42015-08-10 13:53:35 -0700299 AtomicBoolean success = new AtomicBoolean(true);
300 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700301 return CompletableFuture.allOf(subTransactions.entrySet()
andreafd912ac2015-10-02 14:58:35 -0700302 .stream()
303 .map(entry -> entry.getKey().commit(entry.getValue())
304 .thenAccept(response -> {
305 success.set(success.get() && response.success());
306 if (success.get()) {
307 allUpdates.addAll(response.updates());
308 }
309 }))
310 .toArray(CompletableFuture[]::new))
Madan Jampanibab51a42015-08-10 13:53:35 -0700311 .thenApply(v -> success.get() ?
312 CommitResponse.success(allUpdates) : CommitResponse.failure());
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700313 }
314
315 @Override
316 public CompletableFuture<Boolean> rollback(Transaction transaction) {
317 Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
318 return CompletableFuture.allOf(subTransactions.entrySet()
319 .stream()
320 .map(entry -> entry.getKey().rollback(entry.getValue()))
andreafd912ac2015-10-02 14:58:35 -0700321 .toArray(CompletableFuture[]::new))
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700322 .thenApply(v -> true);
323 }
324
325 @Override
Madan Jampanif1b8e172015-03-23 11:42:02 -0700326 public CompletableFuture<Database> open() {
327 return CompletableFuture.allOf(partitions
328 .stream()
329 .map(Database::open)
330 .toArray(CompletableFuture[]::new))
331 .thenApply(v -> {
332 isOpen.set(true);
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700333 return this;
334 });
Madan Jampani94c23532015-02-05 17:40:01 -0800335 }
336
337 @Override
338 public CompletableFuture<Void> close() {
Madan Jampani7f72c3f2015-03-01 17:34:59 -0800339 checkState(isOpen.get(), DB_NOT_OPEN);
Madan Jampanif1b8e172015-03-23 11:42:02 -0700340 return CompletableFuture.allOf(partitions
Madan Jampani94c23532015-02-05 17:40:01 -0800341 .stream()
342 .map(database -> database.close())
Madan Jampani09342702015-02-05 23:32:40 -0800343 .toArray(CompletableFuture[]::new));
Madan Jampani94c23532015-02-05 17:40:01 -0800344 }
Madan Jampanif1b8e172015-03-23 11:42:02 -0700345
346 @Override
347 public boolean isClosed() {
348 return !isOpen.get();
349 }
350
351 @Override
352 public String name() {
353 return name;
354 }
355
356 @Override
357 public Cluster cluster() {
358 throw new UnsupportedOperationException();
359 }
360
361 @Override
362 public Database addStartupTask(Task<CompletableFuture<Void>> task) {
363 throw new UnsupportedOperationException();
364 }
365
366 @Override
367 public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
368 throw new UnsupportedOperationException();
369 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700370
Madan Jampanied1b7fc2015-04-27 23:30:07 -0700371 @Override
372 public ResourceState state() {
373 throw new UnsupportedOperationException();
374 }
375
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700376 private Map<Database, Transaction> createSubTransactions(
377 Transaction transaction) {
Madan Jampanicadd70b2016-02-08 13:45:43 -0800378 Map<Database, List<MapUpdate<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
379 for (MapUpdate<String, byte[]> update : transaction.updates()) {
Madan Jampani7804c992015-07-20 13:20:19 -0700380 Database partition = partitioner.getPartition(update.mapName(), update.key());
Madan Jampanicadd70b2016-02-08 13:45:43 -0800381 List<MapUpdate<String, byte[]>> partitionUpdates =
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700382 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
383 partitionUpdates.add(update);
384 }
385 Map<Database, Transaction> subTransactions = Maps.newHashMap();
Madan Jampanicadd70b2016-02-08 13:45:43 -0800386 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new Transaction(transaction.id(), v)));
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700387 return subTransactions;
388 }
Madan Jampani50589ac2015-06-08 11:38:46 -0700389
Ayaka Koshibe94cc01b2015-06-26 15:39:11 -0700390 protected void setTransactionManager(TransactionManager transactionManager) {
Madan Jampani50589ac2015-06-08 11:38:46 -0700391 this.transactionManager = transactionManager;
392 }
Madan Jampani648451f2015-07-21 22:09:05 -0700393
394 @Override
Madan Jampani648451f2015-07-21 22:09:05 -0700395 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700396 partitions.forEach(p -> p.registerConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700397 }
398
399 @Override
400 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
Madan Jampani34fec842015-07-22 14:05:08 -0700401 partitions.forEach(p -> p.unregisterConsumer(consumer));
Madan Jampani648451f2015-07-21 22:09:05 -0700402 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700403}
andreafd912ac2015-10-02 14:58:35 -0700404