Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 1 | package org.onlab.onos.store.service.impl; |
| 2 | |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 3 | import static com.google.common.base.Preconditions.checkNotNull; |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 4 | import static org.slf4j.LoggerFactory.getLogger; |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 5 | |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 6 | import java.util.List; |
Yuta HIGUCHI | 841c0b6 | 2014-11-13 20:27:14 -0800 | [diff] [blame] | 7 | import java.util.Map; |
Madan Jampani | f5d263b | 2014-11-13 10:04:40 -0800 | [diff] [blame] | 8 | import java.util.Set; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 9 | import java.util.concurrent.CompletableFuture; |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 10 | import java.util.concurrent.CountDownLatch; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 11 | import java.util.concurrent.ExecutionException; |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 12 | import java.util.concurrent.TimeUnit; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 13 | |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 14 | import net.kuujo.copycat.Copycat; |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 15 | import net.kuujo.copycat.event.EventHandler; |
| 16 | import net.kuujo.copycat.event.LeaderElectEvent; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 17 | |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 18 | import org.onlab.onos.store.service.BatchReadRequest; |
| 19 | import org.onlab.onos.store.service.BatchWriteRequest; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 20 | import org.onlab.onos.store.service.DatabaseException; |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 21 | import org.onlab.onos.store.service.ReadResult; |
Yuta HIGUCHI | 841c0b6 | 2014-11-13 20:27:14 -0800 | [diff] [blame] | 22 | import org.onlab.onos.store.service.VersionedValue; |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 23 | import org.onlab.onos.store.service.WriteResult; |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 24 | import org.slf4j.Logger; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 25 | |
Madan Jampani | 686fa18 | 2014-11-04 23:16:27 -0800 | [diff] [blame] | 26 | /** |
| 27 | * Client for interacting with the Copycat Raft cluster. |
| 28 | */ |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 29 | public class DatabaseClient { |
| 30 | |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 31 | private final Logger log = getLogger(getClass()); |
| 32 | |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 33 | private final Copycat copycat; |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 34 | |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 35 | public DatabaseClient(Copycat copycat) { |
| 36 | this.copycat = checkNotNull(copycat); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 37 | } |
| 38 | |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 39 | public void waitForLeader() { |
| 40 | if (copycat.leader() != null) { |
| 41 | return; |
| 42 | } |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 43 | |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 44 | log.info("No leader in cluster, waiting for election."); |
| 45 | final CountDownLatch latch = new CountDownLatch(1); |
| 46 | final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() { |
| 47 | |
| 48 | @Override |
| 49 | public void handle(LeaderElectEvent event) { |
| 50 | log.info("Leader chosen: {}", event); |
| 51 | latch.countDown(); |
| 52 | } |
| 53 | }; |
| 54 | |
| 55 | copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr); |
| 56 | try { |
| 57 | while (copycat.leader() == null) { |
| 58 | latch.await(200, TimeUnit.MILLISECONDS); |
| 59 | } |
| 60 | log.info("Leader appeared: {}", copycat.leader()); |
| 61 | return; |
| 62 | } catch (InterruptedException e) { |
| 63 | log.error("Interrupted while waiting for Leader", e); |
| 64 | Thread.currentThread().interrupt(); |
| 65 | } finally { |
| 66 | copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr); |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | public boolean createTable(String tableName) { |
| 71 | waitForLeader(); |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 72 | CompletableFuture<Boolean> future = copycat.submit("createTable", tableName); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 73 | try { |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 74 | return future.get(); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 75 | } catch (InterruptedException | ExecutionException e) { |
| 76 | throw new DatabaseException(e); |
| 77 | } |
| 78 | } |
| 79 | |
Madan Jampani | def2c65 | 2014-11-12 13:50:10 -0800 | [diff] [blame] | 80 | public boolean createTable(String tableName, int ttlMillis) { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 81 | waitForLeader(); |
Madan Jampani | f5d263b | 2014-11-13 10:04:40 -0800 | [diff] [blame] | 82 | CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName); |
Madan Jampani | def2c65 | 2014-11-12 13:50:10 -0800 | [diff] [blame] | 83 | try { |
| 84 | return future.get(); |
| 85 | } catch (InterruptedException | ExecutionException e) { |
| 86 | throw new DatabaseException(e); |
| 87 | } |
| 88 | } |
| 89 | |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 90 | public void dropTable(String tableName) { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 91 | waitForLeader(); |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 92 | CompletableFuture<Void> future = copycat.submit("dropTable", tableName); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 93 | try { |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 94 | future.get(); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 95 | } catch (InterruptedException | ExecutionException e) { |
| 96 | throw new DatabaseException(e); |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | public void dropAllTables() { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 101 | waitForLeader(); |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 102 | CompletableFuture<Void> future = copycat.submit("dropAllTables"); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 103 | try { |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 104 | future.get(); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 105 | } catch (InterruptedException | ExecutionException e) { |
| 106 | throw new DatabaseException(e); |
| 107 | } |
| 108 | } |
| 109 | |
Madan Jampani | f5d263b | 2014-11-13 10:04:40 -0800 | [diff] [blame] | 110 | public Set<String> listTables() { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 111 | waitForLeader(); |
Madan Jampani | f5d263b | 2014-11-13 10:04:40 -0800 | [diff] [blame] | 112 | CompletableFuture<Set<String>> future = copycat.submit("listTables"); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 113 | try { |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 114 | return future.get(); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 115 | } catch (InterruptedException | ExecutionException e) { |
| 116 | throw new DatabaseException(e); |
| 117 | } |
| 118 | } |
| 119 | |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 120 | public List<ReadResult> batchRead(BatchReadRequest batchRequest) { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 121 | waitForLeader(); |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 122 | CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 123 | try { |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 124 | return future.get(); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 125 | } catch (InterruptedException | ExecutionException e) { |
| 126 | throw new DatabaseException(e); |
| 127 | } |
| 128 | } |
| 129 | |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 130 | public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 131 | waitForLeader(); |
Madan Jampani | 12390c1 | 2014-11-12 00:35:56 -0800 | [diff] [blame] | 132 | CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 133 | try { |
Yuta HIGUCHI | f846844 | 2014-11-11 10:09:20 -0800 | [diff] [blame] | 134 | return future.get(); |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 135 | } catch (InterruptedException | ExecutionException e) { |
| 136 | throw new DatabaseException(e); |
| 137 | } |
| 138 | } |
Yuta HIGUCHI | 841c0b6 | 2014-11-13 20:27:14 -0800 | [diff] [blame] | 139 | |
| 140 | public Map<String, VersionedValue> getAll(String tableName) { |
Yuta HIGUCHI | 39da979 | 2014-11-14 02:07:04 -0800 | [diff] [blame^] | 141 | waitForLeader(); |
Yuta HIGUCHI | 841c0b6 | 2014-11-13 20:27:14 -0800 | [diff] [blame] | 142 | CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName); |
| 143 | try { |
| 144 | return future.get(); |
| 145 | } catch (InterruptedException | ExecutionException e) { |
| 146 | throw new DatabaseException(e); |
| 147 | } |
| 148 | } |
Madan Jampani | 08822c4 | 2014-11-04 17:17:46 -0800 | [diff] [blame] | 149 | } |