blob: 4d9776ee4a697fa52b92f130ee03c8cb6365fead [file] [log] [blame]
Madan Jampani25461112015-02-17 14:17:29 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Madan Jampani94c23532015-02-05 17:40:01 -080017package org.onosproject.store.consistent.impl;
18
Madan Jampani94c23532015-02-05 17:40:01 -080019import net.kuujo.copycat.state.StateMachine;
20import net.kuujo.copycat.resource.internal.AbstractResource;
Madan Jampanied1b7fc2015-04-27 23:30:07 -070021import net.kuujo.copycat.resource.internal.ResourceManager;
Madan Jampani94c23532015-02-05 17:40:01 -080022import net.kuujo.copycat.state.internal.DefaultStateMachine;
23import net.kuujo.copycat.util.concurrent.Futures;
Madan Jampani648451f2015-07-21 22:09:05 -070024import net.kuujo.copycat.util.function.TriConsumer;
Madan Jampani94c23532015-02-05 17:40:01 -080025
26import java.util.Collection;
Madan Jampani94c23532015-02-05 17:40:01 -080027import java.util.Map;
28import java.util.Set;
29import java.util.concurrent.CompletableFuture;
Madan Jampani648451f2015-07-21 22:09:05 -070030import java.util.function.Consumer;
Madan Jampani94c23532015-02-05 17:40:01 -080031import java.util.function.Supplier;
32
Madan Jampanibff6d8f2015-03-31 16:53:47 -070033import org.onosproject.store.service.Transaction;
Madan Jampani393e0f02015-02-12 07:35:39 +053034import org.onosproject.store.service.Versioned;
35
Madan Jampani648451f2015-07-21 22:09:05 -070036import com.google.common.collect.Sets;
37
Madan Jampani94c23532015-02-05 17:40:01 -080038/**
39 * Default database.
40 */
41public class DefaultDatabase extends AbstractResource<Database> implements Database {
Madan Jampanif1b8e172015-03-23 11:42:02 -070042 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
43 private DatabaseProxy<String, byte[]> proxy;
Madan Jampani648451f2015-07-21 22:09:05 -070044 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
45 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
Madan Jampani94c23532015-02-05 17:40:01 -080046
Madan Jampanibff6d8f2015-03-31 16:53:47 -070047 @SuppressWarnings({ "unchecked", "rawtypes" })
Madan Jampanied1b7fc2015-04-27 23:30:07 -070048 public DefaultDatabase(ResourceManager context) {
Madan Jampanif1b8e172015-03-23 11:42:02 -070049 super(context);
Madan Jampani40537ca2015-07-14 19:50:33 -070050 this.stateMachine = new DefaultStateMachine(context,
51 DatabaseState.class,
52 DefaultDatabaseState.class,
53 DefaultDatabase.class.getClassLoader());
Madan Jampani648451f2015-07-21 22:09:05 -070054 this.stateMachine.addStartupTask(() -> {
55 stateMachine.registerWatcher(watcher);
56 return CompletableFuture.completedFuture(null);
57 });
58 this.stateMachine.addShutdownTask(() -> {
59 stateMachine.unregisterWatcher(watcher);
60 return CompletableFuture.completedFuture(null);
61 });
Madan Jampani94c23532015-02-05 17:40:01 -080062 }
Madan Jampani94c23532015-02-05 17:40:01 -080063
Madan Jampanif1b8e172015-03-23 11:42:02 -070064 /**
65 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
66 * return the completed future result.
67 *
68 * @param supplier The supplier to call if the database is open.
69 * @param <T> The future result type.
70 * @return A completable future that if this database is closed is immediately failed.
71 */
72 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
73 if (proxy == null) {
74 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
75 }
76 return supplier.get();
77 }
Madan Jampani94c23532015-02-05 17:40:01 -080078
Madan Jampanif1b8e172015-03-23 11:42:02 -070079 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070080 public CompletableFuture<Set<String>> maps() {
81 return checkOpen(() -> proxy.maps());
Madan Jampania89f8f92015-04-01 14:39:54 -070082 }
83
84 @Override
Madan Jampanib5d72d52015-04-03 16:53:50 -070085 public CompletableFuture<Map<String, Long>> counters() {
86 return checkOpen(() -> proxy.counters());
87 }
88
89 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070090 public CompletableFuture<Integer> mapSize(String mapName) {
91 return checkOpen(() -> proxy.mapSize(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070092 }
Madan Jampani94c23532015-02-05 17:40:01 -080093
Madan Jampanif1b8e172015-03-23 11:42:02 -070094 @Override
Madan Jampani7804c992015-07-20 13:20:19 -070095 public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
96 return checkOpen(() -> proxy.mapIsEmpty(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -070097 }
Madan Jampani94c23532015-02-05 17:40:01 -080098
Madan Jampanif1b8e172015-03-23 11:42:02 -070099 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700100 public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
101 return checkOpen(() -> proxy.mapContainsKey(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700102 }
Madan Jampani94c23532015-02-05 17:40:01 -0800103
Madan Jampanif1b8e172015-03-23 11:42:02 -0700104 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700105 public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
106 return checkOpen(() -> proxy.mapContainsValue(mapName, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700107 }
Madan Jampani94c23532015-02-05 17:40:01 -0800108
Madan Jampanif1b8e172015-03-23 11:42:02 -0700109 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700110 public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
111 return checkOpen(() -> proxy.mapGet(mapName, key));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700112 }
Madan Jampani94c23532015-02-05 17:40:01 -0800113
Madan Jampanif1b8e172015-03-23 11:42:02 -0700114 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700115 public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
116 String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
117 return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700118 }
Madan Jampani94c23532015-02-05 17:40:01 -0800119
Madan Jampanif1b8e172015-03-23 11:42:02 -0700120 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700121 public CompletableFuture<Result<Void>> mapClear(String mapName) {
122 return checkOpen(() -> proxy.mapClear(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700123 }
124
125 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700126 public CompletableFuture<Set<String>> mapKeySet(String mapName) {
127 return checkOpen(() -> proxy.mapKeySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700128 }
129
130 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700131 public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
132 return checkOpen(() -> proxy.mapValues(mapName));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700133 }
Madan Jampani94c23532015-02-05 17:40:01 -0800134
Madan Jampanif1b8e172015-03-23 11:42:02 -0700135 @Override
Madan Jampani7804c992015-07-20 13:20:19 -0700136 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
137 return checkOpen(() -> proxy.mapEntrySet(mapName));
Madan Jampani346d4f52015-05-04 11:09:39 -0700138 }
139
140 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700141 public CompletableFuture<Long> counterGet(String counterName) {
142 return checkOpen(() -> proxy.counterGet(counterName));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700143 }
144
145 @Override
Madan Jampani04aeb452015-05-02 16:12:24 -0700146 public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
147 return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
148 }
149
150 @Override
151 public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
152 return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
Madan Jampanib5d72d52015-04-03 16:53:50 -0700153 }
154
155 @Override
Madan Jampani63c659f2015-06-11 00:52:58 -0700156 public CompletableFuture<Long> queueSize(String queueName) {
157 return checkOpen(() -> proxy.queueSize(queueName));
158 }
159
160 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700161 public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
Madan Jampani63c659f2015-06-11 00:52:58 -0700162 return checkOpen(() -> proxy.queuePush(queueName, entry));
163 }
164
165 @Override
Madan Jampania6d787b2015-08-11 11:02:02 -0700166 public CompletableFuture<byte[]> queuePop(String queueName) {
167 return checkOpen(() -> proxy.queuePop(queueName));
Madan Jampani63c659f2015-06-11 00:52:58 -0700168 }
169
170 @Override
171 public CompletableFuture<byte[]> queuePeek(String queueName) {
172 return checkOpen(() -> proxy.queuePeek(queueName));
173 }
174
175 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700176 public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700177 return checkOpen(() -> proxy.prepareAndCommit(transaction));
178 }
179
180 @Override
181 public CompletableFuture<Boolean> prepare(Transaction transaction) {
182 return checkOpen(() -> proxy.prepare(transaction));
183 }
184
185 @Override
Madan Jampanibab51a42015-08-10 13:53:35 -0700186 public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700187 return checkOpen(() -> proxy.commit(transaction));
188 }
189
190 @Override
191 public CompletableFuture<Boolean> rollback(Transaction transaction) {
192 return checkOpen(() -> proxy.rollback(transaction));
Madan Jampanif1b8e172015-03-23 11:42:02 -0700193 }
Madan Jampani94c23532015-02-05 17:40:01 -0800194
Madan Jampanif1b8e172015-03-23 11:42:02 -0700195 @Override
196 @SuppressWarnings("unchecked")
197 public synchronized CompletableFuture<Database> open() {
198 return runStartupTasks()
199 .thenCompose(v -> stateMachine.open())
200 .thenRun(() -> {
201 this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
202 })
203 .thenApply(v -> null);
204 }
205
206 @Override
207 public synchronized CompletableFuture<Void> close() {
208 proxy = null;
209 return stateMachine.close()
210 .thenCompose(v -> runShutdownTasks());
211 }
212
213 @Override
214 public int hashCode() {
215 return name().hashCode();
216 }
217
218 @Override
219 public boolean equals(Object other) {
220 if (other instanceof Database) {
221 return name().equals(((Database) other).name());
222 }
223 return false;
224 }
Madan Jampani648451f2015-07-21 22:09:05 -0700225
226 @Override
227 public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
228 consumers.add(consumer);
229 }
230
231 @Override
232 public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
233 consumers.remove(consumer);
234 }
235
Madan Jampani648451f2015-07-21 22:09:05 -0700236 private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
237 @Override
238 public void accept(String name, Object input, Object output) {
239 StateMachineUpdate update = new StateMachineUpdate(name, input, output);
240 consumers.forEach(consumer -> consumer.accept(update));
241 }
242 }
Madan Jampanibff6d8f2015-03-31 16:53:47 -0700243}