blob: 59b65657dca5b1a9124b274c13b97cc0c9690d39 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.collect.Sets;
Madan Jampanif2f086c2016-01-13 16:15:39 -080020
Madan Jampani94c23532015-02-05 17:40:01 -080021import net.kuujo.copycat.resource.internal.AbstractResource;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070022import net.kuujo.copycat.resource.internal.ResourceManager;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070023import net.kuujo.copycat.state.StateMachine;
Madan Jampani94c23532015-02-05 17:40:01 -080024import net.kuujo.copycat.state.internal.DefaultStateMachine;
25import net.kuujo.copycat.util.concurrent.Futures;
Madan Jampani648451f2015-07-21 22:09:05 -070026import net.kuujo.copycat.util.function.TriConsumer;
Madan Jampanif2f086c2016-01-13 16:15:39 -080027
28import org.onlab.util.Match;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070029import org.onosproject.store.service.Transaction;
30import org.onosproject.store.service.Versioned;
Madan Jampani94c23532015-02-05 17:40:01 -080031
32import java.util.Collection;
Madan Jampani94c23532015-02-05 17:40:01 -080033import java.util.Map;
34import java.util.Set;
35import java.util.concurrent.CompletableFuture;
Madan Jampani648451f2015-07-21 22:09:05 -070036import java.util.function.Consumer;
Madan Jampani94c23532015-02-05 17:40:01 -080037import java.util.function.Supplier;
38
39/**
40 * Default database.
41 */
42public class DefaultDatabase extends AbstractResource<Database> implements Database {
Madan Jampanif1b8e172015-03-23 11:42:02 -070043 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
44 private DatabaseProxy<String, byte[]> proxy;
Madan Jampani648451f2015-07-21 22:09:05 -070045 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
46 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
Madan Jampani94c23532015-02-05 17:40:01 -080047
andreafd912ac2015-10-02 14:58:35 -070048 @SuppressWarnings({"unchecked", "rawtypes"})
Madan Jampanied1b7fc2015-04-27 23:30:07 -070049 public DefaultDatabase(ResourceManager context) {
Madan Jampanif1b8e172015-03-23 11:42:02 -070050 super(context);
Madan Jampani40537ca2015-07-14 19:50:33 -070051 this.stateMachine = new DefaultStateMachine(context,
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070052 DatabaseState.class,
53 DefaultDatabaseState.class,
54 DefaultDatabase.class.getClassLoader());
Madan Jampani648451f2015-07-21 22:09:05 -070055 this.stateMachine.addStartupTask(() -> {
56 stateMachine.registerWatcher(watcher);
57 return CompletableFuture.completedFuture(null);
58 });
59 this.stateMachine.addShutdownTask(() -> {
60 stateMachine.unregisterWatcher(watcher);
61 return CompletableFuture.completedFuture(null);
62 });
Madan Jampani94c23532015-02-05 17:40:01 -080063 }
Madan Jampani94c23532015-02-05 17:40:01 -080064
Madan Jampanif1b8e172015-03-23 11:42:02 -070065 /**
66 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
67 * return the completed future result.
68 *
69 * @param supplier The supplier to call if the database is open.
andreafd912ac2015-10-02 14:58:35 -070070 * @param <T> The future result type.
Madan Jampanif1b8e172015-03-23 11:42:02 -070071 * @return A completable future that if this database is closed is immediately failed.
72 */
73 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
74 if (proxy == null) {
75 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
76 }
77 return supplier.get();
78 }
Madan Jampani94c23532015-02-05 17:40:01 -080079
Madan Jampanif1b8e172015-03-23 11:42:02 -070080 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070081 public CompletableFuture<Set<String>> maps() {
82 return checkOpen(() -> proxy.maps());
Madan Jampania89f8f92015-04-01 14:39:54 -070083 }
84
85 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070086 public CompletableFuture<Map<String, Long>> counters() {
87 return checkOpen(() -> proxy.counters());
88 }
89
90 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070091 public CompletableFuture<Integer> mapSize(String mapName) {
92 return checkOpen(() -> proxy.mapSize(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070093 }
Madan Jampani94c23532015-02-05 17:40:01 -080094
Madan Jampanif1b8e172015-03-23 11:42:02 -070095 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070096 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
97 return checkOpen(() -> proxy.mapIsEmpty(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070098 }
Madan Jampani94c23532015-02-05 17:40:01 -080099
Madan Jampanif1b8e172015-03-23 11:42:02 -0700100 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700101 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
102 return checkOpen(() -> proxy.mapContainsKey(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700103 }
Madan Jampani94c23532015-02-05 17:40:01 -0800104
Madan Jampanif1b8e172015-03-23 11:42:02 -0700105 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700106 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
107 return checkOpen(() -> proxy.mapContainsValue(mapName, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700108 }
Madan Jampani94c23532015-02-05 17:40:01 -0800109
Madan Jampanif1b8e172015-03-23 11:42:02 -0700110 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700111 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
112 return checkOpen(() -> proxy.mapGet(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700113 }
Madan Jampani94c23532015-02-05 17:40:01 -0800114
Madan Jampanif1b8e172015-03-23 11:42:02 -0700115 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700116 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
117 String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
118 return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700119 }
Madan Jampani94c23532015-02-05 17:40:01 -0800120
Madan Jampanif1b8e172015-03-23 11:42:02 -0700121 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700122 public CompletableFuture<Result<Void>> mapClear(String mapName) {
123 return checkOpen(() -> proxy.mapClear(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700124 }
125
126 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700127 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
128 return checkOpen(() -> proxy.mapKeySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700129 }
130
131 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700132 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
133 return checkOpen(() -> proxy.mapValues(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700134 }
Madan Jampani94c23532015-02-05 17:40:01 -0800135
Madan Jampanif1b8e172015-03-23 11:42:02 -0700136 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700137 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
138 return checkOpen(() -> proxy.mapEntrySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700139 }
140
141 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700142 public CompletableFuture<Long> counterGet(String counterName) {
143 return checkOpen(() -> proxy.counterGet(counterName));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700144 }
145
146 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700147 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
148 return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
149 }
150
151 @Override
152 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
153 return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700154 }
155
156 @Override
andreafd912ac2015-10-02 14:58:35 -0700157 public CompletableFuture<Void> counterSet(String counterName, long value) {
158 return checkOpen(() -> proxy.counterSet(counterName, value));
159 }
160
161 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700162 public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
163 return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
164 }
165
166 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700167 public CompletableFuture<Long> queueSize(String queueName) {
168 return checkOpen(() -> proxy.queueSize(queueName));
169 }
170
171 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700172 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700173 return checkOpen(() -> proxy.queuePush(queueName, entry));
174 }
175
176 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700177 public CompletableFuture<byte[]> queuePop(String queueName) {
178 return checkOpen(() -> proxy.queuePop(queueName));
Madan Jampani63c659f2015-06-11 00:52:58 -0700179 }
180
181 @Override
182 public CompletableFuture<byte[]> queuePeek(String queueName) {
183 return checkOpen(() -> proxy.queuePeek(queueName));
184 }
185
186 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700187 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700188 return checkOpen(() -> proxy.prepareAndCommit(transaction));
189 }
190
191 @Override
192 public CompletableFuture<Boolean> prepare(Transaction transaction) {
193 return checkOpen(() -> proxy.prepare(transaction));
194 }
195
196 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700197 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700198 return checkOpen(() -> proxy.commit(transaction));
199 }
200
201 @Override
202 public CompletableFuture<Boolean> rollback(Transaction transaction) {
203 return checkOpen(() -> proxy.rollback(transaction));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700204 }
Madan Jampani94c23532015-02-05 17:40:01 -0800205
Madan Jampanif1b8e172015-03-23 11:42:02 -0700206 @Override
207 @SuppressWarnings("unchecked")
208 public synchronized CompletableFuture<Database> open() {
209 return runStartupTasks()
210 .thenCompose(v -> stateMachine.open())
211 .thenRun(() -> {
212 this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
213 })
214 .thenApply(v -> null);
215 }
216
217 @Override
218 public synchronized CompletableFuture<Void> close() {
219 proxy = null;
220 return stateMachine.close()
221 .thenCompose(v -> runShutdownTasks());
222 }
223
224 @Override
225 public int hashCode() {
226 return name().hashCode();
227 }
228
229 @Override
230 public boolean equals(Object other) {
231 if (other instanceof Database) {
232 return name().equals(((Database) other).name());
233 }
234 return false;
235 }
Madan Jampani648451f2015-07-21 22:09:05 -0700236
237 @Override
238 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
239 consumers.add(consumer);
240 }
241
242 @Override
243 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
244 consumers.remove(consumer);
245 }
246
Madan Jampani648451f2015-07-21 22:09:05 -0700247 private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
248 @Override
249 public void accept(String name, Object input, Object output) {
250 StateMachineUpdate update = new StateMachineUpdate(name, input, output);
251 consumers.forEach(consumer -> consumer.accept(update));
252 }
253 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700254}