blob: 72e4b95b489732d705d49ae8ee5e6345da01f16e [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08003import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08005
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -08006import java.util.Collections;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -08008import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -08009import java.util.Set;
Madan Jampani08822c42014-11-04 17:17:46 -080010import java.util.concurrent.CompletableFuture;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080011import java.util.concurrent.CountDownLatch;
Madan Jampani08822c42014-11-04 17:17:46 -080012import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080013import java.util.concurrent.TimeUnit;
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080014import java.util.concurrent.TimeoutException;
Madan Jampani08822c42014-11-04 17:17:46 -080015
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080016import net.kuujo.copycat.Copycat;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080017import net.kuujo.copycat.event.EventHandler;
18import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080019
Madan Jampani12390c12014-11-12 00:35:56 -080020import org.onlab.onos.store.service.BatchReadRequest;
21import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080022import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080023import org.onlab.onos.store.service.ReadResult;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080024import org.onlab.onos.store.service.VersionedValue;
Madan Jampani12390c12014-11-12 00:35:56 -080025import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080026import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080027
Madan Jampani686fa182014-11-04 23:16:27 -080028/**
29 * Client for interacting with the Copycat Raft cluster.
30 */
Madan Jampani08822c42014-11-04 17:17:46 -080031public class DatabaseClient {
32
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080033 private static final int RETRIES = 5;
34
35 private static final int TIMEOUT_MS = 2000;
36
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080037 private final Logger log = getLogger(getClass());
38
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080039 private final Copycat copycat;
Madan Jampani08822c42014-11-04 17:17:46 -080040
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080041 public DatabaseClient(Copycat copycat) {
42 this.copycat = checkNotNull(copycat);
Madan Jampani08822c42014-11-04 17:17:46 -080043 }
44
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080045 public void waitForLeader() {
46 if (copycat.leader() != null) {
47 return;
48 }
Madan Jampani08822c42014-11-04 17:17:46 -080049
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080050 log.info("No leader in cluster, waiting for election.");
51 final CountDownLatch latch = new CountDownLatch(1);
52 final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
53
54 @Override
55 public void handle(LeaderElectEvent event) {
56 log.info("Leader chosen: {}", event);
57 latch.countDown();
58 }
59 };
60
61 copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
62 try {
63 while (copycat.leader() == null) {
64 latch.await(200, TimeUnit.MILLISECONDS);
65 }
66 log.info("Leader appeared: {}", copycat.leader());
67 return;
68 } catch (InterruptedException e) {
69 log.error("Interrupted while waiting for Leader", e);
70 Thread.currentThread().interrupt();
71 } finally {
72 copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
73 }
74 }
75
76 public boolean createTable(String tableName) {
77 waitForLeader();
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080078 CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -080079 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080080 return future.get();
Madan Jampani08822c42014-11-04 17:17:46 -080081 } catch (InterruptedException | ExecutionException e) {
82 throw new DatabaseException(e);
83 }
84 }
85
Madan Jampanidef2c652014-11-12 13:50:10 -080086 public boolean createTable(String tableName, int ttlMillis) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080087 waitForLeader();
Madan Jampanif5d263b2014-11-13 10:04:40 -080088 CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
Madan Jampanidef2c652014-11-12 13:50:10 -080089 try {
90 return future.get();
91 } catch (InterruptedException | ExecutionException e) {
92 throw new DatabaseException(e);
93 }
94 }
95
Madan Jampani08822c42014-11-04 17:17:46 -080096 public void dropTable(String tableName) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080097 waitForLeader();
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080098 CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -080099 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800100 future.get();
Madan Jampani08822c42014-11-04 17:17:46 -0800101 } catch (InterruptedException | ExecutionException e) {
102 throw new DatabaseException(e);
103 }
104 }
105
106 public void dropAllTables() {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800107 waitForLeader();
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800108 CompletableFuture<Void> future = copycat.submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800109 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800110 future.get();
Madan Jampani08822c42014-11-04 17:17:46 -0800111 } catch (InterruptedException | ExecutionException e) {
112 throw new DatabaseException(e);
113 }
114 }
115
Madan Jampanif5d263b2014-11-13 10:04:40 -0800116 public Set<String> listTables() {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800117 waitForLeader();
Madan Jampani08822c42014-11-04 17:17:46 -0800118 try {
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800119 for (int i = 0; i < RETRIES; ++i) {
120 CompletableFuture<Set<String>> future = copycat.submit("listTables");
121 try {
122 return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
123 } catch (TimeoutException e) {
124 log.debug("Timed out retrying {}", i);
125 future.cancel(true);
126 waitForLeader();
127 }
128 }
129 // TODO: proper timeout handling
130 log.error("Timed out");
131 return Collections.emptySet();
Madan Jampani08822c42014-11-04 17:17:46 -0800132 } catch (InterruptedException | ExecutionException e) {
133 throw new DatabaseException(e);
134 }
135 }
136
Madan Jampani12390c12014-11-12 00:35:56 -0800137 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800138 waitForLeader();
Madan Jampani12390c12014-11-12 00:35:56 -0800139 CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800140 try {
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800141 return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Madan Jampani08822c42014-11-04 17:17:46 -0800142 } catch (InterruptedException | ExecutionException e) {
143 throw new DatabaseException(e);
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800144 } catch (TimeoutException e) {
145 throw new DatabaseException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800146 }
147 }
148
Madan Jampani12390c12014-11-12 00:35:56 -0800149 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800150 waitForLeader();
Madan Jampani12390c12014-11-12 00:35:56 -0800151 CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800152 try {
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800153 return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Madan Jampani08822c42014-11-04 17:17:46 -0800154 } catch (InterruptedException | ExecutionException e) {
155 throw new DatabaseException(e);
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800156 } catch (TimeoutException e) {
157 throw new DatabaseException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800158 }
159 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800160
161 public Map<String, VersionedValue> getAll(String tableName) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800162 waitForLeader();
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800163 CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
164 try {
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800165 return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800166 } catch (InterruptedException | ExecutionException e) {
167 throw new DatabaseException(e);
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -0800168 } catch (TimeoutException e) {
169 throw new DatabaseException(e);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800170 }
171 }
Madan Jampani08822c42014-11-04 17:17:46 -0800172}