blob: 2e066967d24853ddfd4d1327bed5c862f779e435 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -08003import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08005
Madan Jampania88d1f52014-11-14 16:45:24 -08006import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -08008import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -08009import java.util.Set;
Madan Jampania88d1f52014-11-14 16:45:24 -080010import java.util.UUID;
Madan Jampani08822c42014-11-04 17:17:46 -080011import java.util.concurrent.CompletableFuture;
12import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080013import java.util.concurrent.TimeUnit;
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080014import java.util.concurrent.TimeoutException;
Madan Jampani08822c42014-11-04 17:17:46 -080015
Madan Jampani5ce30252014-11-17 20:53:17 -080016import net.kuujo.copycat.cluster.Member;
Madan Jampania88d1f52014-11-14 16:45:24 -080017import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080018import net.kuujo.copycat.event.LeaderElectEvent;
Yuta HIGUCHId09f1012014-11-20 10:00:10 -080019import net.kuujo.copycat.protocol.Response.Status;
Madan Jampania88d1f52014-11-14 16:45:24 -080020import net.kuujo.copycat.protocol.SubmitRequest;
21import net.kuujo.copycat.protocol.SubmitResponse;
22import net.kuujo.copycat.spi.protocol.ProtocolClient;
Madan Jampani08822c42014-11-04 17:17:46 -080023
Madan Jampani5ce30252014-11-17 20:53:17 -080024import org.onlab.onos.store.cluster.messaging.ClusterMessage;
25import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani12390c12014-11-12 00:35:56 -080026import org.onlab.onos.store.service.BatchReadRequest;
27import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080029import org.onlab.onos.store.service.ReadResult;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080030import org.onlab.onos.store.service.VersionedValue;
Madan Jampani12390c12014-11-12 00:35:56 -080031import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080032import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080033
Madan Jampani686fa182014-11-04 23:16:27 -080034/**
35 * Client for interacting with the Copycat Raft cluster.
36 */
Madan Jampani5ce30252014-11-17 20:53:17 -080037public class DatabaseClient implements ClusterMessageHandler {
Madan Jampani08822c42014-11-04 17:17:46 -080038
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080039 private static final int RETRIES = 5;
40
41 private static final int TIMEOUT_MS = 2000;
42
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080043 private final Logger log = getLogger(getClass());
44
Madan Jampania88d1f52014-11-14 16:45:24 -080045 private final DatabaseProtocolService protocol;
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080046 private volatile ProtocolClient client = null;
Madan Jampani5ce30252014-11-17 20:53:17 -080047 private volatile Member currentLeader = null;
48 private volatile long currentLeaderTerm = 0;
Madan Jampani08822c42014-11-04 17:17:46 -080049
Madan Jampania88d1f52014-11-14 16:45:24 -080050 public DatabaseClient(DatabaseProtocolService protocol) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080051 this.protocol = checkNotNull(protocol);
Madan Jampania88d1f52014-11-14 16:45:24 -080052 }
53
54 @Override
Madan Jampani5ce30252014-11-17 20:53:17 -080055 public void handle(ClusterMessage message) {
56 LeaderElectEvent event =
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080057 ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
Madan Jampani5ce30252014-11-17 20:53:17 -080058 TcpMember newLeader = event.leader();
59 long newLeaderTerm = event.term();
60 if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
61 log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080062 ProtocolClient prevClient = client;
Madan Jampani5ce30252014-11-17 20:53:17 -080063 ProtocolClient newClient = protocol.createClient(newLeader);
64 newClient.connect();
65 client = newClient;
Madan Jampania88d1f52014-11-14 16:45:24 -080066 currentLeader = newLeader;
Madan Jampani5ce30252014-11-17 20:53:17 -080067 currentLeaderTerm = newLeaderTerm;
68
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080069 if (prevClient != null) {
70 prevClient.close();
Madan Jampania88d1f52014-11-14 16:45:24 -080071 }
Madan Jampania88d1f52014-11-14 16:45:24 -080072 }
73 }
74
75 private String nextRequestId() {
76 return UUID.randomUUID().toString();
Madan Jampani08822c42014-11-04 17:17:46 -080077 }
78
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080079 public void waitForLeader() {
Madan Jampania88d1f52014-11-14 16:45:24 -080080 if (currentLeader != null) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080081 return;
82 }
Madan Jampani08822c42014-11-04 17:17:46 -080083
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080084 log.info("No leader in cluster, waiting for election.");
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080085
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080086 try {
Madan Jampania88d1f52014-11-14 16:45:24 -080087 while (currentLeader == null) {
88 Thread.sleep(200);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080089 }
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080090 return;
91 } catch (InterruptedException e) {
92 log.error("Interrupted while waiting for Leader", e);
93 Thread.currentThread().interrupt();
Madan Jampania88d1f52014-11-14 16:45:24 -080094 }
95 }
96
97 private <T> T submit(String operationName, Object... args) {
98 waitForLeader();
99 if (currentLeader == null) {
100 throw new DatabaseException("Raft cluster does not have a leader.");
101 }
102
103 SubmitRequest request =
104 new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
105
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800106 CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
Madan Jampania88d1f52014-11-14 16:45:24 -0800107
108 log.debug("Sent {} to {}", request, currentLeader);
109
110 try {
Yuta HIGUCHId09f1012014-11-20 10:00:10 -0800111 final SubmitResponse response = submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
112 if (response.status() != Status.OK) {
113 throw new DatabaseException(response.error());
114 }
115 return (T) response.result();
Madan Jampania88d1f52014-11-14 16:45:24 -0800116 } catch (ExecutionException | InterruptedException e) {
117 throw new DatabaseException(e);
118 } catch (TimeoutException e) {
119 throw new DatabaseException.Timeout(e);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800120 }
121 }
122
123 public boolean createTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800124 return submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800125 }
126
Madan Jampanidef2c652014-11-12 13:50:10 -0800127 public boolean createTable(String tableName, int ttlMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800128 return submit("createTable", tableName, ttlMillis);
Madan Jampanidef2c652014-11-12 13:50:10 -0800129 }
130
Madan Jampani08822c42014-11-04 17:17:46 -0800131 public void dropTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800132 submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800133 }
134
135 public void dropAllTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800136 submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800137 }
138
Madan Jampanif5d263b2014-11-13 10:04:40 -0800139 public Set<String> listTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800140 return submit("listTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800141 }
142
Madan Jampani12390c12014-11-12 00:35:56 -0800143 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800144 return submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800145 }
146
Madan Jampani12390c12014-11-12 00:35:56 -0800147 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800148 return submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800149 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800150
151 public Map<String, VersionedValue> getAll(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800152 return submit("getAll", tableName);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800153 }
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800154
155 Member getCurrentLeader() {
156 return currentLeader;
157 }
Madan Jampani08822c42014-11-04 17:17:46 -0800158}