blob: 837f3b4012d87e52ef873fa9f4b56b4e6c19be6f [file] [log] [blame]
Madan Jampani94c23532015-02-05 17:40:01 -08001package org.onosproject.store.consistent.impl;
2
3import net.kuujo.copycat.resource.internal.ResourceContext;
4import net.kuujo.copycat.state.StateMachine;
5import net.kuujo.copycat.resource.internal.AbstractResource;
6import net.kuujo.copycat.state.internal.DefaultStateMachine;
7import net.kuujo.copycat.util.concurrent.Futures;
8
9import java.util.Collection;
10import java.util.List;
11import java.util.Map;
12import java.util.Set;
13import java.util.concurrent.CompletableFuture;
14import java.util.function.Supplier;
15
16/**
17 * Default database.
18 */
19public class DefaultDatabase extends AbstractResource<Database> implements Database {
20 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
21 private DatabaseProxy<String, byte[]> proxy;
22
23 @SuppressWarnings("unchecked")
24 public DefaultDatabase(ResourceContext context) {
25 super(context);
26 this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
27 }
28
29 /**
30 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
31 * return the completed future result.
32 *
33 * @param supplier The supplier to call if the database is open.
34 * @param <T> The future result type.
35 * @return A completable future that if this database is closed is immediately failed.
36 */
37 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
38 if (proxy == null) {
39 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
40 }
41 return supplier.get();
42 }
43
44 @Override
45 public CompletableFuture<Integer> size(String tableName) {
46 return checkOpen(() -> proxy.size(tableName));
47 }
48
49 @Override
50 public CompletableFuture<Boolean> isEmpty(String tableName) {
51 return checkOpen(() -> proxy.isEmpty(tableName));
52 }
53
54 @Override
55 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
56 return checkOpen(() -> proxy.containsKey(tableName, key));
57 }
58
59 @Override
60 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
61 return checkOpen(() -> proxy.containsValue(tableName, value));
62 }
63
64 @Override
65 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
66 return checkOpen(() -> proxy.get(tableName, key));
67 }
68
69 @Override
70 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
71 return checkOpen(() -> proxy.put(tableName, key, value));
72 }
73
74 @Override
75 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
76 return checkOpen(() -> proxy.remove(tableName, key));
77 }
78
79 @Override
80 public CompletableFuture<Void> clear(String tableName) {
81 return checkOpen(() -> proxy.clear(tableName));
82 }
83
84 @Override
85 public CompletableFuture<Set<String>> keySet(String tableName) {
86 return checkOpen(() -> proxy.keySet(tableName));
87 }
88
89 @Override
90 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
91 return checkOpen(() -> proxy.values(tableName));
92 }
93
94 @Override
95 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
96 return checkOpen(() -> proxy.entrySet(tableName));
97 }
98
99 @Override
100 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
101 return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
102 }
103
104 @Override
105 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
106 return checkOpen(() -> proxy.remove(tableName, key, value));
107 }
108
109 @Override
110 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
111 return checkOpen(() -> proxy.remove(tableName, key, version));
112 }
113
114 @Override
115 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
116 return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
117 }
118
119 @Override
120 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
121 return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
122 }
123
124 @Override
125 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
126 return checkOpen(() -> proxy.atomicBatchUpdate(updates));
127 }
128
129 @Override
130 @SuppressWarnings("unchecked")
131 public synchronized CompletableFuture<Database> open() {
132 return runStartupTasks()
133 .thenCompose(v -> stateMachine.open())
134 .thenRun(() -> {
135 this.proxy = stateMachine.createProxy(DatabaseProxy.class);
136 })
137 .thenApply(v -> null);
138 }
139
140 @Override
141 public synchronized CompletableFuture<Void> close() {
142 proxy = null;
143 return stateMachine.close()
144 .thenCompose(v -> runShutdownTasks());
145 }
146}