blob: bd898b6f05f4baba6c90e1c1f87def0cce0c195b [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08003import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08005
Madan Jampani08822c42014-11-04 17:17:46 -08006import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -08007import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -08008import java.util.Set;
Madan Jampani08822c42014-11-04 17:17:46 -08009import java.util.concurrent.CompletableFuture;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080010import java.util.concurrent.CountDownLatch;
Madan Jampani08822c42014-11-04 17:17:46 -080011import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080012import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080013
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080014import net.kuujo.copycat.Copycat;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080015import net.kuujo.copycat.event.EventHandler;
16import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080017
Madan Jampani12390c12014-11-12 00:35:56 -080018import org.onlab.onos.store.service.BatchReadRequest;
19import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080021import org.onlab.onos.store.service.ReadResult;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080022import org.onlab.onos.store.service.VersionedValue;
Madan Jampani12390c12014-11-12 00:35:56 -080023import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080024import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080025
Madan Jampani686fa182014-11-04 23:16:27 -080026/**
27 * Client for interacting with the Copycat Raft cluster.
28 */
Madan Jampani08822c42014-11-04 17:17:46 -080029public class DatabaseClient {
30
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080031 private final Logger log = getLogger(getClass());
32
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080033 private final Copycat copycat;
Madan Jampani08822c42014-11-04 17:17:46 -080034
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080035 public DatabaseClient(Copycat copycat) {
36 this.copycat = checkNotNull(copycat);
Madan Jampani08822c42014-11-04 17:17:46 -080037 }
38
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080039 public void waitForLeader() {
40 if (copycat.leader() != null) {
41 return;
42 }
Madan Jampani08822c42014-11-04 17:17:46 -080043
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080044 log.info("No leader in cluster, waiting for election.");
45 final CountDownLatch latch = new CountDownLatch(1);
46 final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
47
48 @Override
49 public void handle(LeaderElectEvent event) {
50 log.info("Leader chosen: {}", event);
51 latch.countDown();
52 }
53 };
54
55 copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
56 try {
57 while (copycat.leader() == null) {
58 latch.await(200, TimeUnit.MILLISECONDS);
59 }
60 log.info("Leader appeared: {}", copycat.leader());
61 return;
62 } catch (InterruptedException e) {
63 log.error("Interrupted while waiting for Leader", e);
64 Thread.currentThread().interrupt();
65 } finally {
66 copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
67 }
68 }
69
70 public boolean createTable(String tableName) {
71 waitForLeader();
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080072 CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -080073 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080074 return future.get();
Madan Jampani08822c42014-11-04 17:17:46 -080075 } catch (InterruptedException | ExecutionException e) {
76 throw new DatabaseException(e);
77 }
78 }
79
Madan Jampanidef2c652014-11-12 13:50:10 -080080 public boolean createTable(String tableName, int ttlMillis) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080081 waitForLeader();
Madan Jampanif5d263b2014-11-13 10:04:40 -080082 CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
Madan Jampanidef2c652014-11-12 13:50:10 -080083 try {
84 return future.get();
85 } catch (InterruptedException | ExecutionException e) {
86 throw new DatabaseException(e);
87 }
88 }
89
Madan Jampani08822c42014-11-04 17:17:46 -080090 public void dropTable(String tableName) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080091 waitForLeader();
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080092 CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -080093 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080094 future.get();
Madan Jampani08822c42014-11-04 17:17:46 -080095 } catch (InterruptedException | ExecutionException e) {
96 throw new DatabaseException(e);
97 }
98 }
99
100 public void dropAllTables() {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800101 waitForLeader();
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800102 CompletableFuture<Void> future = copycat.submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800103 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800104 future.get();
Madan Jampani08822c42014-11-04 17:17:46 -0800105 } catch (InterruptedException | ExecutionException e) {
106 throw new DatabaseException(e);
107 }
108 }
109
Madan Jampanif5d263b2014-11-13 10:04:40 -0800110 public Set<String> listTables() {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800111 waitForLeader();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800112 CompletableFuture<Set<String>> future = copycat.submit("listTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800113 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800114 return future.get();
Madan Jampani08822c42014-11-04 17:17:46 -0800115 } catch (InterruptedException | ExecutionException e) {
116 throw new DatabaseException(e);
117 }
118 }
119
Madan Jampani12390c12014-11-12 00:35:56 -0800120 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800121 waitForLeader();
Madan Jampani12390c12014-11-12 00:35:56 -0800122 CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800123 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800124 return future.get();
Madan Jampani08822c42014-11-04 17:17:46 -0800125 } catch (InterruptedException | ExecutionException e) {
126 throw new DatabaseException(e);
127 }
128 }
129
Madan Jampani12390c12014-11-12 00:35:56 -0800130 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800131 waitForLeader();
Madan Jampani12390c12014-11-12 00:35:56 -0800132 CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800133 try {
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800134 return future.get();
Madan Jampani08822c42014-11-04 17:17:46 -0800135 } catch (InterruptedException | ExecutionException e) {
136 throw new DatabaseException(e);
137 }
138 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800139
140 public Map<String, VersionedValue> getAll(String tableName) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800141 waitForLeader();
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800142 CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
143 try {
144 return future.get();
145 } catch (InterruptedException | ExecutionException e) {
146 throw new DatabaseException(e);
147 }
148 }
Madan Jampani08822c42014-11-04 17:17:46 -0800149}