blob: 876f6cce68296883d0a62da56a57d17917561986 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39da9792014-11-14 02:07:04 -08003import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08004
Madan Jampania88d1f52014-11-14 16:45:24 -08005import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08006import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -08007import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -08008import java.util.Set;
Madan Jampania88d1f52014-11-14 16:45:24 -08009import java.util.UUID;
Madan Jampani08822c42014-11-04 17:17:46 -080010import java.util.concurrent.CompletableFuture;
11import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080012import java.util.concurrent.TimeUnit;
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080013import java.util.concurrent.TimeoutException;
Madan Jampani08822c42014-11-04 17:17:46 -080014
Madan Jampania88d1f52014-11-14 16:45:24 -080015import net.kuujo.copycat.cluster.Member;
16import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080017import net.kuujo.copycat.event.EventHandler;
18import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampania88d1f52014-11-14 16:45:24 -080019import net.kuujo.copycat.protocol.SubmitRequest;
20import net.kuujo.copycat.protocol.SubmitResponse;
21import net.kuujo.copycat.spi.protocol.ProtocolClient;
Madan Jampani08822c42014-11-04 17:17:46 -080022
Madan Jampani12390c12014-11-12 00:35:56 -080023import org.onlab.onos.store.service.BatchReadRequest;
24import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080025import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080026import org.onlab.onos.store.service.ReadResult;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080027import org.onlab.onos.store.service.VersionedValue;
Madan Jampani12390c12014-11-12 00:35:56 -080028import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080029import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080030
Madan Jampani686fa182014-11-04 23:16:27 -080031/**
32 * Client for interacting with the Copycat Raft cluster.
33 */
Madan Jampania88d1f52014-11-14 16:45:24 -080034public class DatabaseClient implements EventHandler<LeaderElectEvent> {
Madan Jampani08822c42014-11-04 17:17:46 -080035
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080036 private static final int RETRIES = 5;
37
38 private static final int TIMEOUT_MS = 2000;
39
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080040 private final Logger log = getLogger(getClass());
41
Madan Jampania88d1f52014-11-14 16:45:24 -080042 private final DatabaseProtocolService protocol;
43 private volatile ProtocolClient copycat = null;
44 private volatile Member currentLeader = null;
Madan Jampani08822c42014-11-04 17:17:46 -080045
Madan Jampania88d1f52014-11-14 16:45:24 -080046 public DatabaseClient(DatabaseProtocolService protocol) {
47 this.protocol = protocol;
48 }
49
50 @Override
51 public void handle(LeaderElectEvent event) {
52 Member newLeader = event.leader();
53 if (newLeader != null && !newLeader.equals(currentLeader)) {
54 currentLeader = newLeader;
55 if (copycat != null) {
56 copycat.close();
57 }
58 copycat = protocol.createClient((TcpMember) currentLeader);
59 copycat.connect();
60 }
61 }
62
63 private String nextRequestId() {
64 return UUID.randomUUID().toString();
Madan Jampani08822c42014-11-04 17:17:46 -080065 }
66
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080067 public void waitForLeader() {
Madan Jampania88d1f52014-11-14 16:45:24 -080068 if (currentLeader != null) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080069 return;
70 }
Madan Jampani08822c42014-11-04 17:17:46 -080071
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080072 log.info("No leader in cluster, waiting for election.");
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080073
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080074 try {
Madan Jampania88d1f52014-11-14 16:45:24 -080075 while (currentLeader == null) {
76 Thread.sleep(200);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080077 }
Madan Jampania88d1f52014-11-14 16:45:24 -080078 log.info("Leader appeared: {}", currentLeader);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080079 return;
80 } catch (InterruptedException e) {
81 log.error("Interrupted while waiting for Leader", e);
82 Thread.currentThread().interrupt();
Madan Jampania88d1f52014-11-14 16:45:24 -080083 }
84 }
85
86 private <T> T submit(String operationName, Object... args) {
87 waitForLeader();
88 if (currentLeader == null) {
89 throw new DatabaseException("Raft cluster does not have a leader.");
90 }
91
92 SubmitRequest request =
93 new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
94
95 CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
96
97 log.debug("Sent {} to {}", request, currentLeader);
98
99 try {
100 return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
101 } catch (ExecutionException | InterruptedException e) {
102 throw new DatabaseException(e);
103 } catch (TimeoutException e) {
104 throw new DatabaseException.Timeout(e);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800105 }
106 }
107
108 public boolean createTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800109 return submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800110 }
111
Madan Jampanidef2c652014-11-12 13:50:10 -0800112 public boolean createTable(String tableName, int ttlMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800113 return submit("createTable", tableName, ttlMillis);
Madan Jampanidef2c652014-11-12 13:50:10 -0800114 }
115
Madan Jampani08822c42014-11-04 17:17:46 -0800116 public void dropTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800117 submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800118 }
119
120 public void dropAllTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800121 submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800122 }
123
Madan Jampanif5d263b2014-11-13 10:04:40 -0800124 public Set<String> listTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800125 return submit("listTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800126 }
127
Madan Jampani12390c12014-11-12 00:35:56 -0800128 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800129 return submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800130 }
131
Madan Jampani12390c12014-11-12 00:35:56 -0800132 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800133 return submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800134 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800135
136 public Map<String, VersionedValue> getAll(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800137 return submit("getAll", tableName);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800138 }
Madan Jampani08822c42014-11-04 17:17:46 -0800139}