blob: 869b1883669b9c4438a1db931e7c61d39d15e923 [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani08822c42014-11-04 17:17:46 -080017
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080018import static com.google.common.base.Preconditions.checkNotNull;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080019import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf8468442014-11-11 10:09:20 -080020
Madan Jampania88d1f52014-11-14 16:45:24 -080021import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -080022import java.util.List;
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -080023import java.util.Map;
Madan Jampanif5d263b2014-11-13 10:04:40 -080024import java.util.Set;
Madan Jampania88d1f52014-11-14 16:45:24 -080025import java.util.UUID;
Madan Jampani08822c42014-11-04 17:17:46 -080026import java.util.concurrent.CompletableFuture;
27import java.util.concurrent.ExecutionException;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080028import java.util.concurrent.TimeUnit;
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080029import java.util.concurrent.TimeoutException;
Madan Jampani08822c42014-11-04 17:17:46 -080030
Madan Jampani5ce30252014-11-17 20:53:17 -080031import net.kuujo.copycat.cluster.Member;
Madan Jampania88d1f52014-11-14 16:45:24 -080032import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080033import net.kuujo.copycat.event.LeaderElectEvent;
Yuta HIGUCHId09f1012014-11-20 10:00:10 -080034import net.kuujo.copycat.protocol.Response.Status;
Madan Jampania88d1f52014-11-14 16:45:24 -080035import net.kuujo.copycat.protocol.SubmitRequest;
36import net.kuujo.copycat.protocol.SubmitResponse;
37import net.kuujo.copycat.spi.protocol.ProtocolClient;
Madan Jampani08822c42014-11-04 17:17:46 -080038
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.store.cluster.messaging.ClusterMessage;
40import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
41import org.onosproject.store.service.BatchReadRequest;
42import org.onosproject.store.service.BatchWriteRequest;
43import org.onosproject.store.service.DatabaseException;
44import org.onosproject.store.service.ReadResult;
45import org.onosproject.store.service.VersionedValue;
46import org.onosproject.store.service.WriteResult;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080047import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080048
Madan Jampani686fa182014-11-04 23:16:27 -080049/**
50 * Client for interacting with the Copycat Raft cluster.
51 */
Madan Jampani5ce30252014-11-17 20:53:17 -080052public class DatabaseClient implements ClusterMessageHandler {
Madan Jampani08822c42014-11-04 17:17:46 -080053
Yuta HIGUCHIb9d68662014-11-14 16:06:03 -080054 private static final int RETRIES = 5;
55
56 private static final int TIMEOUT_MS = 2000;
57
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080058 private final Logger log = getLogger(getClass());
59
Madan Jampania88d1f52014-11-14 16:45:24 -080060 private final DatabaseProtocolService protocol;
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080061 private volatile ProtocolClient client = null;
Madan Jampani5ce30252014-11-17 20:53:17 -080062 private volatile Member currentLeader = null;
63 private volatile long currentLeaderTerm = 0;
Madan Jampani08822c42014-11-04 17:17:46 -080064
Madan Jampania88d1f52014-11-14 16:45:24 -080065 public DatabaseClient(DatabaseProtocolService protocol) {
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080066 this.protocol = checkNotNull(protocol);
Madan Jampania88d1f52014-11-14 16:45:24 -080067 }
68
69 @Override
Madan Jampani5ce30252014-11-17 20:53:17 -080070 public void handle(ClusterMessage message) {
71 LeaderElectEvent event =
Yuta HIGUCHI91768e32014-11-22 05:06:35 -080072 ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
Madan Jampani5ce30252014-11-17 20:53:17 -080073 TcpMember newLeader = event.leader();
74 long newLeaderTerm = event.term();
75 if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
76 log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080077 ProtocolClient prevClient = client;
Madan Jampani5ce30252014-11-17 20:53:17 -080078 ProtocolClient newClient = protocol.createClient(newLeader);
79 newClient.connect();
80 client = newClient;
Madan Jampania88d1f52014-11-14 16:45:24 -080081 currentLeader = newLeader;
Madan Jampani5ce30252014-11-17 20:53:17 -080082 currentLeaderTerm = newLeaderTerm;
83
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -080084 if (prevClient != null) {
85 prevClient.close();
Madan Jampania88d1f52014-11-14 16:45:24 -080086 }
Madan Jampania88d1f52014-11-14 16:45:24 -080087 }
88 }
89
90 private String nextRequestId() {
91 return UUID.randomUUID().toString();
Madan Jampani08822c42014-11-04 17:17:46 -080092 }
93
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080094 public void waitForLeader() {
Madan Jampania88d1f52014-11-14 16:45:24 -080095 if (currentLeader != null) {
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080096 return;
97 }
Madan Jampani08822c42014-11-04 17:17:46 -080098
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080099 log.info("No leader in cluster, waiting for election.");
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800100
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800101 try {
Madan Jampania88d1f52014-11-14 16:45:24 -0800102 while (currentLeader == null) {
103 Thread.sleep(200);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800104 }
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800105 return;
106 } catch (InterruptedException e) {
107 log.error("Interrupted while waiting for Leader", e);
108 Thread.currentThread().interrupt();
Madan Jampania88d1f52014-11-14 16:45:24 -0800109 }
110 }
111
112 private <T> T submit(String operationName, Object... args) {
113 waitForLeader();
114 if (currentLeader == null) {
115 throw new DatabaseException("Raft cluster does not have a leader.");
116 }
117
118 SubmitRequest request =
119 new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
120
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800121 CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
Madan Jampania88d1f52014-11-14 16:45:24 -0800122
123 log.debug("Sent {} to {}", request, currentLeader);
124
125 try {
Yuta HIGUCHId09f1012014-11-20 10:00:10 -0800126 final SubmitResponse response = submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
127 if (response.status() != Status.OK) {
128 throw new DatabaseException(response.error());
129 }
130 return (T) response.result();
Madan Jampania88d1f52014-11-14 16:45:24 -0800131 } catch (ExecutionException | InterruptedException e) {
132 throw new DatabaseException(e);
133 } catch (TimeoutException e) {
134 throw new DatabaseException.Timeout(e);
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800135 }
136 }
137
138 public boolean createTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800139 return submit("createTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800140 }
141
Madan Jampanidef2c652014-11-12 13:50:10 -0800142 public boolean createTable(String tableName, int ttlMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800143 return submit("createTable", tableName, ttlMillis);
Madan Jampanidef2c652014-11-12 13:50:10 -0800144 }
145
Madan Jampani08822c42014-11-04 17:17:46 -0800146 public void dropTable(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800147 submit("dropTable", tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800148 }
149
150 public void dropAllTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800151 submit("dropAllTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800152 }
153
Madan Jampanif5d263b2014-11-13 10:04:40 -0800154 public Set<String> listTables() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800155 return submit("listTables");
Madan Jampani08822c42014-11-04 17:17:46 -0800156 }
157
Madan Jampani12390c12014-11-12 00:35:56 -0800158 public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800159 return submit("read", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800160 }
161
Madan Jampani12390c12014-11-12 00:35:56 -0800162 public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800163 return submit("write", batchRequest);
Madan Jampani08822c42014-11-04 17:17:46 -0800164 }
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800165
166 public Map<String, VersionedValue> getAll(String tableName) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800167 return submit("getAll", tableName);
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800168 }
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800169
170 Member getCurrentLeader() {
171 return currentLeader;
172 }
Madan Jampani08822c42014-11-04 17:17:46 -0800173}