blob: 71608ef21f46d1b7d8c71b564a14f2b9850fe6a7 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -08003import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08005
Madan Jampania88d1f52014-11-14 16:45:24 -08006import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -08008import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -08009import java.util.Set;
Madan Jampania88d1f52014-11-14 16:45:24 -080010import java.util.UUID;
Madan Jampani08822c42014-11-04 17:17:46 -080011import java.util.concurrent.CompletableFuture;
12import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080013import java.util.concurrent.TimeUnit;
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080014import java.util.concurrent.TimeoutException;
Madan Jampani08822c42014-11-04 17:17:46 -080015
Madan Jampani5ce30252014-11-17 20:53:17 -080016import net.kuujo.copycat.cluster.Member;
Madan Jampania88d1f52014-11-14 16:45:24 -080017import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080018import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampania88d1f52014-11-14 16:45:24 -080019import net.kuujo.copycat.protocol.SubmitRequest;
20import net.kuujo.copycat.protocol.SubmitResponse;
21import net.kuujo.copycat.spi.protocol.ProtocolClient;
Madan Jampani08822c42014-11-04 17:17:46 -080022
Madan Jampani5ce30252014-11-17 20:53:17 -080023import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani12390c12014-11-12 00:35:56 -080025import org.onlab.onos.store.service.BatchReadRequest;
26import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080027import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080028import org.onlab.onos.store.service.ReadResult;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080029import org.onlab.onos.store.service.VersionedValue;
Madan Jampani12390c12014-11-12 00:35:56 -080030import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080031import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080032
Madan Jampani686fa182014-11-04 23:16:27 -080033/**
34 * Client for interacting with the Copycat Raft cluster.
35 */
Madan Jampani5ce30252014-11-17 20:53:17 -080036public class DatabaseClient implements ClusterMessageHandler {
Madan Jampani08822c42014-11-04 17:17:46 -080037
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080038 private static final int RETRIES = 5;
39
40 private static final int TIMEOUT_MS = 2000;
41
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080042 private final Logger log = getLogger(getClass());
43
Madan Jampania88d1f52014-11-14 16:45:24 -080044 private final DatabaseProtocolService protocol;
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080045 private volatile ProtocolClient client = null;
Madan Jampani5ce30252014-11-17 20:53:17 -080046 private volatile Member currentLeader = null;
47 private volatile long currentLeaderTerm = 0;
Madan Jampani08822c42014-11-04 17:17:46 -080048
Madan Jampania88d1f52014-11-14 16:45:24 -080049 public DatabaseClient(DatabaseProtocolService protocol) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080050 this.protocol = checkNotNull(protocol);
Madan Jampania88d1f52014-11-14 16:45:24 -080051 }
52
53 @Override
Madan Jampani5ce30252014-11-17 20:53:17 -080054 public void handle(ClusterMessage message) {
55 LeaderElectEvent event =
56 ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
57 TcpMember newLeader = event.leader();
58 long newLeaderTerm = event.term();
59 if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
60 log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080061 ProtocolClient prevClient = client;
Madan Jampani5ce30252014-11-17 20:53:17 -080062 ProtocolClient newClient = protocol.createClient(newLeader);
63 newClient.connect();
64 client = newClient;
Madan Jampania88d1f52014-11-14 16:45:24 -080065 currentLeader = newLeader;
Madan Jampani5ce30252014-11-17 20:53:17 -080066 currentLeaderTerm = newLeaderTerm;
67
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080068 if (prevClient != null) {
69 prevClient.close();
Madan Jampania88d1f52014-11-14 16:45:24 -080070 }
Madan Jampania88d1f52014-11-14 16:45:24 -080071 }
72 }
73
74 private String nextRequestId() {
75 return UUID.randomUUID().toString();
Madan Jampani08822c42014-11-04 17:17:46 -080076 }
77
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080078 public void waitForLeader() {
Madan Jampania88d1f52014-11-14 16:45:24 -080079 if (currentLeader != null) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080080 return;
81 }
Madan Jampani08822c42014-11-04 17:17:46 -080082
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080083 log.info("No leader in cluster, waiting for election.");
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080084
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080085 try {
Madan Jampania88d1f52014-11-14 16:45:24 -080086 while (currentLeader == null) {
87 Thread.sleep(200);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080088 }
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080089 return;
90 } catch (InterruptedException e) {
91 log.error("Interrupted while waiting for Leader", e);
92 Thread.currentThread().interrupt();
Madan Jampania88d1f52014-11-14 16:45:24 -080093 }
94 }
95
96 private <T> T submit(String operationName, Object... args) {
97 waitForLeader();
98 if (currentLeader == null) {
99 throw new DatabaseException("Raft cluster does not have a leader.");
100 }
101
102 SubmitRequest request =
103 new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
104
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800105 CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
Madan Jampania88d1f52014-11-14 16:45:24 -0800106
107 log.debug("Sent {} to {}", request, currentLeader);
108
109 try {
110 return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
111 } catch (ExecutionException | InterruptedException e) {
112 throw new DatabaseException(e);
113 } catch (TimeoutException e) {
114 throw new DatabaseException.Timeout(e);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800115 }
116 }
117
118 public boolean createTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800119 return submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800120 }
121
Madan Jampanidef2c652014-11-12 13:50:10 -0800122 public boolean createTable(String tableName, int ttlMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800123 return submit("createTable", tableName, ttlMillis);
Madan Jampanidef2c652014-11-12 13:50:10 -0800124 }
125
Madan Jampani08822c42014-11-04 17:17:46 -0800126 public void dropTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800127 submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800128 }
129
130 public void dropAllTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800131 submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800132 }
133
Madan Jampanif5d263b2014-11-13 10:04:40 -0800134 public Set<String> listTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800135 return submit("listTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800136 }
137
Madan Jampani12390c12014-11-12 00:35:56 -0800138 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800139 return submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800140 }
141
Madan Jampani12390c12014-11-12 00:35:56 -0800142 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800143 return submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800144 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800145
146 public Map<String, VersionedValue> getAll(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800147 return submit("getAll", tableName);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800148 }
Madan Jampani08822c42014-11-04 17:17:46 -0800149}