blob: e4de2964ad1c59fa5abbbfc618f2c5f034a91d08 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampanif4c88502016-01-21 12:35:36 -080017package org.onosproject.store.primitives.impl;
Madan Jampani94c23532015-02-05 17:40:01 -080018
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.collect.Sets;
Madan Jampanif2f086c2016-01-13 16:15:39 -080020
Madan Jampani94c23532015-02-05 17:40:01 -080021import net.kuujo.copycat.resource.internal.AbstractResource;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070022import net.kuujo.copycat.resource.internal.ResourceManager;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070023import net.kuujo.copycat.state.StateMachine;
Madan Jampani94c23532015-02-05 17:40:01 -080024import net.kuujo.copycat.state.internal.DefaultStateMachine;
25import net.kuujo.copycat.util.concurrent.Futures;
Madan Jampani648451f2015-07-21 22:09:05 -070026import net.kuujo.copycat.util.function.TriConsumer;
Madan Jampanif2f086c2016-01-13 16:15:39 -080027
28import org.onlab.util.Match;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070029import org.onosproject.store.service.Versioned;
Madan Jampani94c23532015-02-05 17:40:01 -080030
31import java.util.Collection;
Madan Jampani94c23532015-02-05 17:40:01 -080032import java.util.Map;
33import java.util.Set;
34import java.util.concurrent.CompletableFuture;
Madan Jampani648451f2015-07-21 22:09:05 -070035import java.util.function.Consumer;
Madan Jampani94c23532015-02-05 17:40:01 -080036import java.util.function.Supplier;
37
38/**
39 * Default database.
40 */
41public class DefaultDatabase extends AbstractResource<Database> implements Database {
Madan Jampanif1b8e172015-03-23 11:42:02 -070042 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
43 private DatabaseProxy<String, byte[]> proxy;
Madan Jampani648451f2015-07-21 22:09:05 -070044 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
45 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
Madan Jampani94c23532015-02-05 17:40:01 -080046
andreafd912ac2015-10-02 14:58:35 -070047 @SuppressWarnings({"unchecked", "rawtypes"})
Madan Jampanied1b7fc2015-04-27 23:30:07 -070048 public DefaultDatabase(ResourceManager context) {
Madan Jampanif1b8e172015-03-23 11:42:02 -070049 super(context);
Madan Jampani40537ca2015-07-14 19:50:33 -070050 this.stateMachine = new DefaultStateMachine(context,
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070051 DatabaseState.class,
52 DefaultDatabaseState.class,
53 DefaultDatabase.class.getClassLoader());
Madan Jampani648451f2015-07-21 22:09:05 -070054 this.stateMachine.addStartupTask(() -> {
55 stateMachine.registerWatcher(watcher);
56 return CompletableFuture.completedFuture(null);
57 });
58 this.stateMachine.addShutdownTask(() -> {
59 stateMachine.unregisterWatcher(watcher);
60 return CompletableFuture.completedFuture(null);
61 });
Madan Jampani94c23532015-02-05 17:40:01 -080062 }
Madan Jampani94c23532015-02-05 17:40:01 -080063
Madan Jampanif1b8e172015-03-23 11:42:02 -070064 /**
65 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
66 * return the completed future result.
67 *
68 * @param supplier The supplier to call if the database is open.
andreafd912ac2015-10-02 14:58:35 -070069 * @param <T> The future result type.
Madan Jampanif1b8e172015-03-23 11:42:02 -070070 * @return A completable future that if this database is closed is immediately failed.
71 */
72 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
73 if (proxy == null) {
74 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
75 }
76 return supplier.get();
77 }
Madan Jampani94c23532015-02-05 17:40:01 -080078
Madan Jampanif1b8e172015-03-23 11:42:02 -070079 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070080 public CompletableFuture<Set<String>> maps() {
81 return checkOpen(() -> proxy.maps());
Madan Jampania89f8f92015-04-01 14:39:54 -070082 }
83
84 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070085 public CompletableFuture<Map<String, Long>> counters() {
86 return checkOpen(() -> proxy.counters());
87 }
88
89 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070090 public CompletableFuture<Integer> mapSize(String mapName) {
91 return checkOpen(() -> proxy.mapSize(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070092 }
Madan Jampani94c23532015-02-05 17:40:01 -080093
Madan Jampanif1b8e172015-03-23 11:42:02 -070094 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070095 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
96 return checkOpen(() -> proxy.mapIsEmpty(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070097 }
Madan Jampani94c23532015-02-05 17:40:01 -080098
Madan Jampanif1b8e172015-03-23 11:42:02 -070099 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700100 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
101 return checkOpen(() -> proxy.mapContainsKey(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700102 }
Madan Jampani94c23532015-02-05 17:40:01 -0800103
Madan Jampanif1b8e172015-03-23 11:42:02 -0700104 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700105 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
106 return checkOpen(() -> proxy.mapContainsValue(mapName, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700107 }
Madan Jampani94c23532015-02-05 17:40:01 -0800108
Madan Jampanif1b8e172015-03-23 11:42:02 -0700109 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700110 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
111 return checkOpen(() -> proxy.mapGet(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700112 }
Madan Jampani94c23532015-02-05 17:40:01 -0800113
Madan Jampanif1b8e172015-03-23 11:42:02 -0700114 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700115 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
116 String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
117 return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700118 }
Madan Jampani94c23532015-02-05 17:40:01 -0800119
Madan Jampanif1b8e172015-03-23 11:42:02 -0700120 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700121 public CompletableFuture<Result<Void>> mapClear(String mapName) {
122 return checkOpen(() -> proxy.mapClear(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700123 }
124
125 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700126 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
127 return checkOpen(() -> proxy.mapKeySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700128 }
129
130 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700131 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
132 return checkOpen(() -> proxy.mapValues(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700133 }
Madan Jampani94c23532015-02-05 17:40:01 -0800134
Madan Jampanif1b8e172015-03-23 11:42:02 -0700135 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700136 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
137 return checkOpen(() -> proxy.mapEntrySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700138 }
139
140 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700141 public CompletableFuture<Long> counterGet(String counterName) {
142 return checkOpen(() -> proxy.counterGet(counterName));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700143 }
144
145 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700146 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
147 return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
148 }
149
150 @Override
151 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
152 return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700153 }
154
155 @Override
andreafd912ac2015-10-02 14:58:35 -0700156 public CompletableFuture<Void> counterSet(String counterName, long value) {
157 return checkOpen(() -> proxy.counterSet(counterName, value));
158 }
159
160 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700161 public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
162 return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
163 }
164
165 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700166 public CompletableFuture<Long> queueSize(String queueName) {
167 return checkOpen(() -> proxy.queueSize(queueName));
168 }
169
170 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700171 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700172 return checkOpen(() -> proxy.queuePush(queueName, entry));
173 }
174
175 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700176 public CompletableFuture<byte[]> queuePop(String queueName) {
177 return checkOpen(() -> proxy.queuePop(queueName));
Madan Jampani63c659f2015-06-11 00:52:58 -0700178 }
179
180 @Override
181 public CompletableFuture<byte[]> queuePeek(String queueName) {
182 return checkOpen(() -> proxy.queuePeek(queueName));
183 }
184
185 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700186 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700187 return checkOpen(() -> proxy.prepareAndCommit(transaction));
188 }
189
190 @Override
191 public CompletableFuture<Boolean> prepare(Transaction transaction) {
192 return checkOpen(() -> proxy.prepare(transaction));
193 }
194
195 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700196 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700197 return checkOpen(() -> proxy.commit(transaction));
198 }
199
200 @Override
201 public CompletableFuture<Boolean> rollback(Transaction transaction) {
202 return checkOpen(() -> proxy.rollback(transaction));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700203 }
Madan Jampani94c23532015-02-05 17:40:01 -0800204
Madan Jampanif1b8e172015-03-23 11:42:02 -0700205 @Override
206 @SuppressWarnings("unchecked")
207 public synchronized CompletableFuture<Database> open() {
208 return runStartupTasks()
209 .thenCompose(v -> stateMachine.open())
210 .thenRun(() -> {
211 this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
212 })
213 .thenApply(v -> null);
214 }
215
216 @Override
217 public synchronized CompletableFuture<Void> close() {
218 proxy = null;
219 return stateMachine.close()
220 .thenCompose(v -> runShutdownTasks());
221 }
222
223 @Override
224 public int hashCode() {
225 return name().hashCode();
226 }
227
228 @Override
229 public boolean equals(Object other) {
230 if (other instanceof Database) {
231 return name().equals(((Database) other).name());
232 }
233 return false;
234 }
Madan Jampani648451f2015-07-21 22:09:05 -0700235
236 @Override
237 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
238 consumers.add(consumer);
239 }
240
241 @Override
242 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
243 consumers.remove(consumer);
244 }
245
Madan Jampani648451f2015-07-21 22:09:05 -0700246 private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
247 @Override
248 public void accept(String name, Object input, Object output) {
249 StateMachineUpdate update = new StateMachineUpdate(name, input, output);
250 consumers.forEach(consumer -> consumer.accept(update));
251 }
252 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700253}