blob: 27e04c4c3f7f9c0950b2a6d7ae50d64d39ea64fa [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -08003import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -08004import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -08005
Madan Jampania88d1f52014-11-14 16:45:24 -08006import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -08008import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -08009import java.util.Set;
Madan Jampania88d1f52014-11-14 16:45:24 -080010import java.util.UUID;
Madan Jampani08822c42014-11-04 17:17:46 -080011import java.util.concurrent.CompletableFuture;
12import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080013import java.util.concurrent.TimeUnit;
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080014import java.util.concurrent.TimeoutException;
Madan Jampani08822c42014-11-04 17:17:46 -080015
Madan Jampania88d1f52014-11-14 16:45:24 -080016import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080017import net.kuujo.copycat.event.EventHandler;
18import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampania88d1f52014-11-14 16:45:24 -080019import net.kuujo.copycat.protocol.SubmitRequest;
20import net.kuujo.copycat.protocol.SubmitResponse;
21import net.kuujo.copycat.spi.protocol.ProtocolClient;
Madan Jampani08822c42014-11-04 17:17:46 -080022
Madan Jampani12390c12014-11-12 00:35:56 -080023import org.onlab.onos.store.service.BatchReadRequest;
24import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080025import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080026import org.onlab.onos.store.service.ReadResult;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080027import org.onlab.onos.store.service.VersionedValue;
Madan Jampani12390c12014-11-12 00:35:56 -080028import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080029import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080030
Madan Jampani686fa182014-11-04 23:16:27 -080031/**
32 * Client for interacting with the Copycat Raft cluster.
33 */
Madan Jampania88d1f52014-11-14 16:45:24 -080034public class DatabaseClient implements EventHandler<LeaderElectEvent> {
Madan Jampani08822c42014-11-04 17:17:46 -080035
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080036 private static final int RETRIES = 5;
37
38 private static final int TIMEOUT_MS = 2000;
39
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080040 private final Logger log = getLogger(getClass());
41
Madan Jampania88d1f52014-11-14 16:45:24 -080042 private final DatabaseProtocolService protocol;
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080043 private volatile ProtocolClient client = null;
44 private volatile TcpMember currentLeader = null;
45
Madan Jampani08822c42014-11-04 17:17:46 -080046
Madan Jampania88d1f52014-11-14 16:45:24 -080047 public DatabaseClient(DatabaseProtocolService protocol) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080048 this.protocol = checkNotNull(protocol);
Madan Jampania88d1f52014-11-14 16:45:24 -080049 }
50
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080051 // FIXME This handler relies on a fact that local node is part of Raft cluster
Madan Jampania88d1f52014-11-14 16:45:24 -080052 @Override
53 public void handle(LeaderElectEvent event) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080054 final TcpMember newLeader = event.leader();
Madan Jampania88d1f52014-11-14 16:45:24 -080055 if (newLeader != null && !newLeader.equals(currentLeader)) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080056 log.info("{} became the new leader", newLeader);
57 ProtocolClient prevClient = client;
58 ProtocolClient newclient = protocol.createClient(newLeader);
59 newclient.connect();
60 client = newclient;
Madan Jampania88d1f52014-11-14 16:45:24 -080061 currentLeader = newLeader;
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080062 if (prevClient != null) {
63 prevClient.close();
Madan Jampania88d1f52014-11-14 16:45:24 -080064 }
Madan Jampania88d1f52014-11-14 16:45:24 -080065 }
66 }
67
68 private String nextRequestId() {
69 return UUID.randomUUID().toString();
Madan Jampani08822c42014-11-04 17:17:46 -080070 }
71
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080072 public void waitForLeader() {
Madan Jampania88d1f52014-11-14 16:45:24 -080073 if (currentLeader != null) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080074 return;
75 }
Madan Jampani08822c42014-11-04 17:17:46 -080076
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080077 log.info("No leader in cluster, waiting for election.");
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080078
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080079 try {
Madan Jampania88d1f52014-11-14 16:45:24 -080080 while (currentLeader == null) {
81 Thread.sleep(200);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080082 }
Madan Jampania88d1f52014-11-14 16:45:24 -080083 log.info("Leader appeared: {}", currentLeader);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080084 return;
85 } catch (InterruptedException e) {
86 log.error("Interrupted while waiting for Leader", e);
87 Thread.currentThread().interrupt();
Madan Jampania88d1f52014-11-14 16:45:24 -080088 }
89 }
90
91 private <T> T submit(String operationName, Object... args) {
92 waitForLeader();
93 if (currentLeader == null) {
94 throw new DatabaseException("Raft cluster does not have a leader.");
95 }
96
97 SubmitRequest request =
98 new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
99
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800100 CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
Madan Jampania88d1f52014-11-14 16:45:24 -0800101
102 log.debug("Sent {} to {}", request, currentLeader);
103
104 try {
105 return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
106 } catch (ExecutionException | InterruptedException e) {
107 throw new DatabaseException(e);
108 } catch (TimeoutException e) {
109 throw new DatabaseException.Timeout(e);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800110 }
111 }
112
113 public boolean createTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800114 return submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800115 }
116
Madan Jampanidef2c652014-11-12 13:50:10 -0800117 public boolean createTable(String tableName, int ttlMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800118 return submit("createTable", tableName, ttlMillis);
Madan Jampanidef2c652014-11-12 13:50:10 -0800119 }
120
Madan Jampani08822c42014-11-04 17:17:46 -0800121 public void dropTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800122 submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800123 }
124
125 public void dropAllTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800126 submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800127 }
128
Madan Jampanif5d263b2014-11-13 10:04:40 -0800129 public Set<String> listTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800130 return submit("listTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800131 }
132
Madan Jampani12390c12014-11-12 00:35:56 -0800133 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800134 return submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800135 }
136
Madan Jampani12390c12014-11-12 00:35:56 -0800137 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800138 return submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800139 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800140
141 public Map<String, VersionedValue> getAll(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800142 return submit("getAll", tableName);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800143 }
Madan Jampani08822c42014-11-04 17:17:46 -0800144}