blob: 0ddaab6e6cb3cddae4de57c9851331f9de467c03 [file] [log] [blame]
Madan Jampani94c23532015-02-05 17:40:01 -08001package org.onosproject.store.consistent.impl;
2
3import net.kuujo.copycat.resource.internal.ResourceContext;
4import net.kuujo.copycat.state.StateMachine;
5import net.kuujo.copycat.resource.internal.AbstractResource;
6import net.kuujo.copycat.state.internal.DefaultStateMachine;
7import net.kuujo.copycat.util.concurrent.Futures;
8
9import java.util.Collection;
10import java.util.List;
11import java.util.Map;
12import java.util.Set;
13import java.util.concurrent.CompletableFuture;
14import java.util.function.Supplier;
15
Madan Jampani393e0f02015-02-12 07:35:39 +053016import org.onosproject.store.service.UpdateOperation;
17import org.onosproject.store.service.Versioned;
18
Madan Jampani94c23532015-02-05 17:40:01 -080019/**
20 * Default database.
21 */
22public class DefaultDatabase extends AbstractResource<Database> implements Database {
23 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
24 private DatabaseProxy<String, byte[]> proxy;
25
26 @SuppressWarnings("unchecked")
27 public DefaultDatabase(ResourceContext context) {
28 super(context);
29 this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
30 }
31
32 /**
33 * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
34 * return the completed future result.
35 *
36 * @param supplier The supplier to call if the database is open.
37 * @param <T> The future result type.
38 * @return A completable future that if this database is closed is immediately failed.
39 */
40 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
41 if (proxy == null) {
42 return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
43 }
44 return supplier.get();
45 }
46
47 @Override
48 public CompletableFuture<Integer> size(String tableName) {
49 return checkOpen(() -> proxy.size(tableName));
50 }
51
52 @Override
53 public CompletableFuture<Boolean> isEmpty(String tableName) {
54 return checkOpen(() -> proxy.isEmpty(tableName));
55 }
56
57 @Override
58 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
59 return checkOpen(() -> proxy.containsKey(tableName, key));
60 }
61
62 @Override
63 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
64 return checkOpen(() -> proxy.containsValue(tableName, value));
65 }
66
67 @Override
68 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
69 return checkOpen(() -> proxy.get(tableName, key));
70 }
71
72 @Override
73 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
74 return checkOpen(() -> proxy.put(tableName, key, value));
75 }
76
77 @Override
78 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
79 return checkOpen(() -> proxy.remove(tableName, key));
80 }
81
82 @Override
83 public CompletableFuture<Void> clear(String tableName) {
84 return checkOpen(() -> proxy.clear(tableName));
85 }
86
87 @Override
88 public CompletableFuture<Set<String>> keySet(String tableName) {
89 return checkOpen(() -> proxy.keySet(tableName));
90 }
91
92 @Override
93 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
94 return checkOpen(() -> proxy.values(tableName));
95 }
96
97 @Override
98 public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
99 return checkOpen(() -> proxy.entrySet(tableName));
100 }
101
102 @Override
103 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
104 return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
105 }
106
107 @Override
108 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
109 return checkOpen(() -> proxy.remove(tableName, key, value));
110 }
111
112 @Override
113 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
114 return checkOpen(() -> proxy.remove(tableName, key, version));
115 }
116
117 @Override
118 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
119 return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
120 }
121
122 @Override
123 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
124 return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
125 }
126
127 @Override
128 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
129 return checkOpen(() -> proxy.atomicBatchUpdate(updates));
130 }
131
132 @Override
133 @SuppressWarnings("unchecked")
134 public synchronized CompletableFuture<Database> open() {
135 return runStartupTasks()
136 .thenCompose(v -> stateMachine.open())
137 .thenRun(() -> {
Madan Jampani393e0f02015-02-12 07:35:39 +0530138 this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
Madan Jampani94c23532015-02-05 17:40:01 -0800139 })
140 .thenApply(v -> null);
141 }
142
143 @Override
144 public synchronized CompletableFuture<Void> close() {
145 proxy = null;
146 return stateMachine.close()
147 .thenCompose(v -> runShutdownTasks());
148 }
149}