blob: 2a50fbd6b21efa90b1f4d95b1e3a5dec031da712 [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070019import com.google.common.collect.Sets;
Madan Jampani94c23532015-02-05 17:40:01 -080020import net.kuujo.copycat.resource.internal.AbstractResource;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070021import net.kuujo.copycat.resource.internal.ResourceManager;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070022import net.kuujo.copycat.state.StateMachine;
Madan Jampani94c23532015-02-05 17:40:01 -080023import net.kuujo.copycat.state.internal.DefaultStateMachine;
24import net.kuujo.copycat.util.concurrent.Futures;
Madan Jampani648451f2015-07-21 22:09:05 -070025import net.kuujo.copycat.util.function.TriConsumer;
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070026import org.onosproject.store.service.Transaction;
27import org.onosproject.store.service.Versioned;
Madan Jampani94c23532015-02-05 17:40:01 -080028
29import java.util.Collection;
Madan Jampani94c23532015-02-05 17:40:01 -080030import java.util.Map;
31import java.util.Set;
32import java.util.concurrent.CompletableFuture;
Madan Jampani648451f2015-07-21 22:09:05 -070033import java.util.function.Consumer;
Madan Jampani94c23532015-02-05 17:40:01 -080034import java.util.function.Supplier;
35
36/**
37 * Default database.
38 */
39public class DefaultDatabase extends AbstractResource<Database> implements Database {
Madan Jampanif1b8e172015-03-23 11:42:02 -070040 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
41 private DatabaseProxy<String, byte[]> proxy;
Madan Jampani648451f2015-07-21 22:09:05 -070042 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
43 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
Madan Jampani94c23532015-02-05 17:40:01 -080044
andreafd912ac2015-10-02 14:58:35 -070045 @SuppressWarnings({"unchecked", "rawtypes"})
Madan Jampanied1b7fc2015-04-27 23:30:07 -070046 public DefaultDatabase(ResourceManager context) {
Madan Jampanif1b8e172015-03-23 11:42:02 -070047 super(context);
Madan Jampani40537ca2015-07-14 19:50:33 -070048 this.stateMachine = new DefaultStateMachine(context,
Aaron Kruglikov82fd6322015-10-06 12:02:46 -070049 DatabaseState.class,
50 DefaultDatabaseState.class,
51 DefaultDatabase.class.getClassLoader());
Madan Jampani648451f2015-07-21 22:09:05 -070052 this.stateMachine.addStartupTask(() -> {
53 stateMachine.registerWatcher(watcher);
54 return CompletableFuture.completedFuture(null);
55 });
56 this.stateMachine.addShutdownTask(() -> {
57 stateMachine.unregisterWatcher(watcher);
58 return CompletableFuture.completedFuture(null);
59 });
Madan Jampani94c23532015-02-05 17:40:01 -080060 }
Madan Jampani94c23532015-02-05 17:40:01 -080061
Madan Jampanif1b8e172015-03-23 11:42:02 -070062 /**
63 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
64 * return the completed future result.
65 *
66 * @param supplier The supplier to call if the database is open.
andreafd912ac2015-10-02 14:58:35 -070067 * @param <T> The future result type.
Madan Jampanif1b8e172015-03-23 11:42:02 -070068 * @return A completable future that if this database is closed is immediately failed.
69 */
70 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
71 if (proxy == null) {
72 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
73 }
74 return supplier.get();
75 }
Madan Jampani94c23532015-02-05 17:40:01 -080076
Madan Jampanif1b8e172015-03-23 11:42:02 -070077 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070078 public CompletableFuture<Set<String>> maps() {
79 return checkOpen(() -> proxy.maps());
Madan Jampania89f8f92015-04-01 14:39:54 -070080 }
81
82 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070083 public CompletableFuture<Map<String, Long>> counters() {
84 return checkOpen(() -> proxy.counters());
85 }
86
87 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070088 public CompletableFuture<Integer> mapSize(String mapName) {
89 return checkOpen(() -> proxy.mapSize(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070090 }
Madan Jampani94c23532015-02-05 17:40:01 -080091
Madan Jampanif1b8e172015-03-23 11:42:02 -070092 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070093 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
94 return checkOpen(() -> proxy.mapIsEmpty(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070095 }
Madan Jampani94c23532015-02-05 17:40:01 -080096
Madan Jampanif1b8e172015-03-23 11:42:02 -070097 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070098 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
99 return checkOpen(() -> proxy.mapContainsKey(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700100 }
Madan Jampani94c23532015-02-05 17:40:01 -0800101
Madan Jampanif1b8e172015-03-23 11:42:02 -0700102 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700103 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
104 return checkOpen(() -> proxy.mapContainsValue(mapName, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700105 }
Madan Jampani94c23532015-02-05 17:40:01 -0800106
Madan Jampanif1b8e172015-03-23 11:42:02 -0700107 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700108 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
109 return checkOpen(() -> proxy.mapGet(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700110 }
Madan Jampani94c23532015-02-05 17:40:01 -0800111
Madan Jampanif1b8e172015-03-23 11:42:02 -0700112 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700113 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
114 String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
115 return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700116 }
Madan Jampani94c23532015-02-05 17:40:01 -0800117
Madan Jampanif1b8e172015-03-23 11:42:02 -0700118 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700119 public CompletableFuture<Result<Void>> mapClear(String mapName) {
120 return checkOpen(() -> proxy.mapClear(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700121 }
122
123 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700124 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
125 return checkOpen(() -> proxy.mapKeySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700126 }
127
128 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700129 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
130 return checkOpen(() -> proxy.mapValues(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700131 }
Madan Jampani94c23532015-02-05 17:40:01 -0800132
Madan Jampanif1b8e172015-03-23 11:42:02 -0700133 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700134 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
135 return checkOpen(() -> proxy.mapEntrySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700136 }
137
138 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700139 public CompletableFuture<Long> counterGet(String counterName) {
140 return checkOpen(() -> proxy.counterGet(counterName));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700141 }
142
143 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700144 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
145 return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
146 }
147
148 @Override
149 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
150 return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700151 }
152
153 @Override
andreafd912ac2015-10-02 14:58:35 -0700154 public CompletableFuture<Void> counterSet(String counterName, long value) {
155 return checkOpen(() -> proxy.counterSet(counterName, value));
156 }
157
158 @Override
Aaron Kruglikov82fd6322015-10-06 12:02:46 -0700159 public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
160 return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
161 }
162
163 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700164 public CompletableFuture<Long> queueSize(String queueName) {
165 return checkOpen(() -> proxy.queueSize(queueName));
166 }
167
168 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700169 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700170 return checkOpen(() -> proxy.queuePush(queueName, entry));
171 }
172
173 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700174 public CompletableFuture<byte[]> queuePop(String queueName) {
175 return checkOpen(() -> proxy.queuePop(queueName));
Madan Jampani63c659f2015-06-11 00:52:58 -0700176 }
177
178 @Override
179 public CompletableFuture<byte[]> queuePeek(String queueName) {
180 return checkOpen(() -> proxy.queuePeek(queueName));
181 }
182
183 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700184 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700185 return checkOpen(() -> proxy.prepareAndCommit(transaction));
186 }
187
188 @Override
189 public CompletableFuture<Boolean> prepare(Transaction transaction) {
190 return checkOpen(() -> proxy.prepare(transaction));
191 }
192
193 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700194 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700195 return checkOpen(() -> proxy.commit(transaction));
196 }
197
198 @Override
199 public CompletableFuture<Boolean> rollback(Transaction transaction) {
200 return checkOpen(() -> proxy.rollback(transaction));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700201 }
Madan Jampani94c23532015-02-05 17:40:01 -0800202
Madan Jampanif1b8e172015-03-23 11:42:02 -0700203 @Override
204 @SuppressWarnings("unchecked")
205 public synchronized CompletableFuture<Database> open() {
206 return runStartupTasks()
207 .thenCompose(v -> stateMachine.open())
208 .thenRun(() -> {
209 this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
210 })
211 .thenApply(v -> null);
212 }
213
214 @Override
215 public synchronized CompletableFuture<Void> close() {
216 proxy = null;
217 return stateMachine.close()
218 .thenCompose(v -> runShutdownTasks());
219 }
220
221 @Override
222 public int hashCode() {
223 return name().hashCode();
224 }
225
226 @Override
227 public boolean equals(Object other) {
228 if (other instanceof Database) {
229 return name().equals(((Database) other).name());
230 }
231 return false;
232 }
Madan Jampani648451f2015-07-21 22:09:05 -0700233
234 @Override
235 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
236 consumers.add(consumer);
237 }
238
239 @Override
240 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
241 consumers.remove(consumer);
242 }
243
Madan Jampani648451f2015-07-21 22:09:05 -0700244 private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
245 @Override
246 public void accept(String name, Object input, Object output) {
247 StateMachineUpdate update = new StateMachineUpdate(name, input, output);
248 consumers.forEach(consumer -> consumer.accept(update));
249 }
250 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700251}