blob: 6931b66ad27797fd43e571a2a7c720d77bc842b0 [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani08822c42014-11-04 17:17:46 -080017
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -080018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19import static org.onlab.util.Tools.namedThreads;
Madan Jampani08822c42014-11-04 17:17:46 -080020import static org.slf4j.LoggerFactory.getLogger;
21
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080022import java.io.File;
23import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080024import java.util.Collection;
25import java.util.Collections;
26import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080027import java.util.Map;
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -080028import java.util.Optional;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080029import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080030import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080031import java.util.concurrent.ExecutionException;
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -080032import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080033import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080034
35import net.kuujo.copycat.Copycat;
Yuta HIGUCHI4e450812014-11-23 01:53:28 -080036import net.kuujo.copycat.CopycatConfig;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080037import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080038import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080039import net.kuujo.copycat.cluster.TcpCluster;
40import net.kuujo.copycat.cluster.TcpClusterConfig;
41import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080042import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080043import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080044import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080045
Madan Jampani08822c42014-11-04 17:17:46 -080046import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080047import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080048import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080049import org.apache.felix.scr.annotations.Reference;
50import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080051import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080052import org.onosproject.cluster.ClusterEvent;
53import org.onosproject.cluster.ClusterEventListener;
54import org.onosproject.cluster.ClusterService;
55import org.onosproject.cluster.ControllerNode;
56import org.onosproject.cluster.DefaultControllerNode;
57import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58import org.onosproject.store.cluster.messaging.ClusterMessage;
59import org.onosproject.store.cluster.messaging.MessageSubject;
60import org.onosproject.store.service.BatchReadRequest;
61import org.onosproject.store.service.BatchReadResult;
62import org.onosproject.store.service.BatchWriteRequest;
63import org.onosproject.store.service.BatchWriteResult;
64import org.onosproject.store.service.DatabaseAdminService;
65import org.onosproject.store.service.DatabaseException;
66import org.onosproject.store.service.DatabaseService;
67import org.onosproject.store.service.ReadResult;
68import org.onosproject.store.service.ReadStatus;
69import org.onosproject.store.service.VersionedValue;
70import org.onosproject.store.service.WriteResult;
71import org.onosproject.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080072import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080073import org.slf4j.Logger;
74
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080075import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080076
Madan Jampani08822c42014-11-04 17:17:46 -080077/**
78 * Strongly consistent and durable state management service based on
79 * Copycat implementation of Raft consensus protocol.
80 */
Brian O'Connor5d55ed42014-12-01 18:27:47 -080081@Component(immediate = false)
Madan Jampanidfbfa182014-11-04 22:06:41 -080082@Service
Madan Jampani08822c42014-11-04 17:17:46 -080083public class DatabaseManager implements DatabaseService, DatabaseAdminService {
84
Yuta HIGUCHI28052222014-11-20 16:45:32 -080085 private static final int RETRY_MS = 500;
86
87 private static final int ACTIVATE_MAX_RETRIES = 100;
88
Madan Jampani08822c42014-11-04 17:17:46 -080089 private final Logger log = getLogger(getClass());
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080092 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080093
Madan Jampani9b19a822014-11-04 21:37:13 -080094 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080095 protected ClusterCommunicationService clusterCommunicator;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080098 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080099
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800100 public static final String LOG_FILE_PREFIX = "raft/onos-copy-cat-log_";
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800101
102 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -0800103 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800104 private static final String CONFIG_DIR = "../config";
105
106 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
107
108 private static final String DEFAULT_TABLET = "default";
109
110 // TODO: make this configurable
111 // initial member configuration file path
112 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -0800113
Madan Jampani5ce30252014-11-17 20:53:17 -0800114 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
115 new MessageSubject("raft-leader-election-event");
116
Madan Jampani08822c42014-11-04 17:17:46 -0800117 private Copycat copycat;
118 private DatabaseClient client;
119
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800120 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800121 private ClusterConfig<TcpMember> clusterConfig;
122
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800123 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800124 private ClusterEventListener clusterEventListener;
125
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800126 private Map<String, Set<DefaultControllerNode>> tabletMembers;
127
128 private boolean autoAddMember = false;
129
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800130 private ScheduledExecutorService executor;
131
132 private volatile LeaderElectEvent myLeaderEvent = null;
133
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800134 // TODO make this configurable
Madan Jampani4bb70c62014-11-25 23:47:12 -0800135 private int maxLogSizeBytes = 128 * (1024 * 1024);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800136
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800137 // TODO make this configurable
138 private long electionTimeoutMs = 5000; // CopyCat default: 2000
139
Madan Jampani08822c42014-11-04 17:17:46 -0800140 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800141 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800142
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800143 // KARAF_DATA
144 // http://karaf.apache.org/manual/latest/users-guide/start-stop.html
145 final String dataDir = System.getProperty("karaf.data", "./data");
146
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800147 // load tablet configuration
148 File file = new File(CONFIG_DIR, initialMemberConfig);
149 log.info("Loading config: {}", file.getAbsolutePath());
150 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
151 try {
152 tabletMembers = tabletDef.read();
153 } catch (IOException e) {
154 log.error("Failed to load tablet config {}", file);
155 throw new IllegalStateException("Failed to load tablet config", e);
156 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800157
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800158 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800159 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800160 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
161 if (defaultMembers == null || defaultMembers.isEmpty()) {
162 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800163 DEFAULT_TABLET);
164 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800165
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800166 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800167
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800168 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800169 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800170 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
171 member.tcpPort());
172 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800173 clusterConfig.setLocalMember(tcpMember);
174 } else {
175 clusterConfig.addRemoteMember(tcpMember);
176 }
177 }
178
Madan Jampani5ce30252014-11-17 20:53:17 -0800179 if (clusterConfig.getLocalMember() != null) {
180
181 // Wait for a minimum viable Raft cluster to boot up.
182 waitForClusterQuorum();
183
184 final TcpCluster cluster;
185 synchronized (clusterConfig) {
186 // Create the cluster.
187 cluster = new TcpCluster(clusterConfig);
188 }
189 log.info("Starting cluster: {}", cluster);
190
191 DatabaseEntryExpirationTracker expirationTracker =
192 new DatabaseEntryExpirationTracker(
193 clusterConfig.getLocalMember(),
194 clusterService.getLocalNode(),
195 clusterCommunicator,
196 this);
197
198 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
199 stateMachine.addEventListener(expirationTracker);
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800200 Log consensusLog = new MapDBLog(dataDir + "/" + LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800201 ClusterMessagingProtocol.DB_SERIALIZER);
Madan Jampani5ce30252014-11-17 20:53:17 -0800202
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800203 CopycatConfig ccConfig = new CopycatConfig();
204 ccConfig.setMaxLogSize(maxLogSizeBytes);
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800205 ccConfig.setElectionTimeout(electionTimeoutMs);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800206
207 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol, ccConfig);
Madan Jampani5ce30252014-11-17 20:53:17 -0800208 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
209 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
210 }
211
212 client = new DatabaseClient(copycatMessagingProtocol);
213 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
214
215 // Starts copycat if this node is a participant
216 // of the Raft cluster.
217 if (copycat != null) {
218 copycat.start().get();
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800219
220 executor =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800221 newSingleThreadScheduledExecutor(namedThreads("onos-db-heartbeat-%d"));
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800222 executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
223
Madan Jampani5ce30252014-11-17 20:53:17 -0800224 }
225
226 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800227
228 // Try and list the tables to verify database manager is
229 // in a state where it can serve requests.
230 tryTableListing();
231
Madan Jampani5ce30252014-11-17 20:53:17 -0800232 log.info("Started.");
233 }
234
235 @Deactivate
236 public void deactivate() {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800237 if (executor != null) {
238 executor.shutdownNow();
239 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800240 clusterService.removeListener(clusterEventListener);
241 // TODO: ClusterCommunicationService must support more than one
242 // handler per message subject.
243 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
244 if (copycat != null) {
245 copycat.stop();
246 }
247 log.info("Stopped.");
248 }
249
250 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800251 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800252 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800253 clusterEventListener = new InternalClusterEventListener();
254 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800255
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800256 final int raftClusterSize = clusterConfig.getMembers().size();
257 final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
258 if (clusterService.getNodes().size() < raftClusterQuorumSize) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800259 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800260 try {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800261 final int waitTimeSec = 120;
262 log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
263 if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800264 log.info("Starting with {}/{} nodes cluster",
265 clusterService.getNodes().size(),
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800266 raftClusterSize);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800267 }
268 } catch (InterruptedException e) {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800269 log.info("Interrupted waiting for raft quorum.", e);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800270 }
271 }
Madan Jampani08822c42014-11-04 17:17:46 -0800272 }
273
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800274 private void tryTableListing() throws InterruptedException {
Madan Jampani71582ed2014-11-18 10:06:01 -0800275 int retries = 0;
276 do {
277 try {
278 listTables();
279 return;
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800280 } catch (DatabaseException.Timeout e) {
281 log.debug("Failed to listTables. Will retry...", e);
Madan Jampani71582ed2014-11-18 10:06:01 -0800282 } catch (DatabaseException e) {
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800283 log.debug("Failed to listTables. Will retry later...", e);
284 Thread.sleep(RETRY_MS);
Yuta HIGUCHI01365622014-11-25 17:35:15 -0800285 }
286 if (retries == ACTIVATE_MAX_RETRIES) {
287 log.error("Failed to listTables after multiple attempts. Giving up.");
288 // Exiting hoping things will be fixed by the time
289 // others start using the service
290 return;
Madan Jampani71582ed2014-11-18 10:06:01 -0800291 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800292 retries++;
Madan Jampani71582ed2014-11-18 10:06:01 -0800293 } while (true);
294 }
295
Madan Jampani08822c42014-11-04 17:17:46 -0800296 @Override
297 public boolean createTable(String name) {
298 return client.createTable(name);
299 }
300
301 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800302 public boolean createTable(String name, int ttlMillis) {
303 return client.createTable(name, ttlMillis);
304 }
305
306 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800307 public void dropTable(String name) {
308 client.dropTable(name);
309 }
310
311 @Override
312 public void dropAllTables() {
313 client.dropAllTables();
314 }
315
316 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800317 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800318 return client.listTables();
319 }
320
321 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800322 public VersionedValue get(String tableName, String key) {
323 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
324 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
325 if (readResult.status().equals(ReadStatus.OK)) {
326 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800327 }
Madan Jampani12390c12014-11-12 00:35:56 -0800328 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800329 }
330
331 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800332 public Map<String, VersionedValue> getAll(String tableName) {
333 return client.getAll(tableName);
334 }
335
336
337 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800338 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
339 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800340 }
341
342 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800343 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
344 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800345 }
346
347 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800348 public VersionedValue put(String tableName, String key, byte[] value) {
349 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
350 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
351 if (writeResult.status().equals(WriteStatus.OK)) {
352 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800353 }
Madan Jampani12390c12014-11-12 00:35:56 -0800354 throw new DatabaseException("put failed due to status: " + writeResult.status());
355 }
Madan Jampani08822c42014-11-04 17:17:46 -0800356
Madan Jampani12390c12014-11-12 00:35:56 -0800357 @Override
358 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800359 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
360 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800361 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
362 if (writeResult.status().equals(WriteStatus.OK)) {
363 return true;
364 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
365 return false;
366 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800367 throw new DatabaseException("putIfAbsent failed due to status: "
368 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800369 }
370
371 @Override
372 public boolean putIfVersionMatches(String tableName, String key,
373 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800374 BatchWriteRequest batchRequest =
375 new BatchWriteRequest.Builder()
376 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800377 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
378 if (writeResult.status().equals(WriteStatus.OK)) {
379 return true;
380 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
381 return false;
382 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800383 throw new DatabaseException("putIfVersionMatches failed due to status: "
384 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800385 }
386
387 @Override
388 public boolean putIfValueMatches(String tableName, String key,
389 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800390 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
391 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800392 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
393 if (writeResult.status().equals(WriteStatus.OK)) {
394 return true;
395 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
396 return false;
397 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800398 throw new DatabaseException("putIfValueMatches failed due to status: "
399 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800400 }
401
402 @Override
403 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800404 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
405 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800406 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
407 if (writeResult.status().equals(WriteStatus.OK)) {
408 return writeResult.previousValue();
409 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800410 throw new DatabaseException("remove failed due to status: "
411 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800412 }
413
414 @Override
415 public boolean removeIfVersionMatches(String tableName, String key,
416 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800417 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
418 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800419 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
420 if (writeResult.status().equals(WriteStatus.OK)) {
421 return true;
422 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
423 return false;
424 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800425 throw new DatabaseException("removeIfVersionMatches failed due to status: "
426 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800427 }
428
429 @Override
430 public boolean removeIfValueMatches(String tableName, String key,
431 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800432 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
433 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800434 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
435 if (writeResult.status().equals(WriteStatus.OK)) {
436 return true;
437 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
438 return false;
439 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800440 throw new DatabaseException("removeIfValueMatches failed due to status: "
441 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800442 }
443
444 @Override
445 public void addMember(final ControllerNode node) {
446 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
447 node.tcpPort());
448 log.info("{} was added to the cluster", tcpMember);
449 synchronized (clusterConfig) {
450 clusterConfig.addRemoteMember(tcpMember);
451 }
Madan Jampani08822c42014-11-04 17:17:46 -0800452 }
453
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800454 @Override
455 public Optional<ControllerNode> leader() {
456 if (copycat != null) {
457 if (copycat.isLeader()) {
458 return Optional.of(clusterService.getLocalNode());
459 }
460 Member leader = copycat.cluster().remoteMember(copycat.leader());
461 return Optional.ofNullable(getNodeIdFromMember(leader));
462 }
463 return Optional.ofNullable(getNodeIdFromMember(client.getCurrentLeader()));
464 }
465
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800466 private final class LeaderAdvertiser implements Runnable {
467
Madan Jampani5ce30252014-11-17 20:53:17 -0800468 @Override
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800469 public void run() {
Madan Jampani5ce30252014-11-17 20:53:17 -0800470 try {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800471 LeaderElectEvent event = myLeaderEvent;
472 if (event != null) {
473 log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
Madan Jampani5ce30252014-11-17 20:53:17 -0800474 // This node just became the leader.
475 clusterCommunicator.broadcastIncludeSelf(
476 new ClusterMessage(
477 clusterService.getLocalNode().id(),
478 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800479 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Madan Jampani5ce30252014-11-17 20:53:17 -0800480 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800481 } catch (Exception e) {
482 log.debug("LeaderAdvertiser failed with exception", e);
483 }
484 }
485
486 }
487
488 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
489 @Override
490 public void handle(LeaderElectEvent event) {
491 try {
492 log.debug("Received LeaderElectEvent: {}", event);
493 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
494 log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
495 myLeaderEvent = event;
496 // This node just became the leader.
497 clusterCommunicator.broadcastIncludeSelf(
498 new ClusterMessage(
499 clusterService.getLocalNode().id(),
500 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800501 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800502 } else {
503 if (myLeaderEvent != null) {
504 log.debug("This node is no longer the Leader");
505 }
506 myLeaderEvent = null;
507 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800508 } catch (IOException e) {
509 log.error("Failed to broadcast raft leadership change event", e);
510 }
511 }
512 }
513
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800514 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800515 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800516
517 @Override
518 public void event(ClusterEvent event) {
519 // TODO: Not every node should be part of the consensus ring.
520
521 final ControllerNode node = event.subject();
522 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800523 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800524
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800525 switch (event.type()) {
526 case INSTANCE_ACTIVATED:
527 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800528 if (autoAddMember) {
529 synchronized (clusterConfig) {
530 if (!clusterConfig.getMembers().contains(tcpMember)) {
531 log.info("{} was automatically added to the cluster", tcpMember);
532 clusterConfig.addRemoteMember(tcpMember);
533 }
534 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800535 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800536 break;
537 case INSTANCE_DEACTIVATED:
538 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800539 if (autoAddMember) {
540 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800541 = tabletMembers.getOrDefault(DEFAULT_TABLET,
542 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800543 // remove only if not the initial members
544 if (!members.contains(node)) {
545 synchronized (clusterConfig) {
546 if (clusterConfig.getMembers().contains(tcpMember)) {
547 log.info("{} was automatically removed from the cluster", tcpMember);
548 clusterConfig.removeRemoteMember(tcpMember);
549 }
550 }
551 }
552 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800553 break;
554 default:
555 break;
556 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800557 if (copycat != null) {
558 log.debug("Current cluster: {}", copycat.cluster());
559 }
560 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800561 }
562
563 }
564
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800565 @Override
566 public void removeMember(final ControllerNode node) {
567 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
568 node.tcpPort());
569 log.info("{} was removed from the cluster", tcpMember);
570 synchronized (clusterConfig) {
571 clusterConfig.removeRemoteMember(tcpMember);
572 }
573 }
574
575 @Override
576 public Collection<ControllerNode> listMembers() {
577 if (copycat == null) {
578 return ImmutableList.of();
579 }
580 Set<ControllerNode> members = new HashSet<>();
581 for (Member member : copycat.cluster().members()) {
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800582 ControllerNode node = getNodeIdFromMember(member);
583 if (node == null) {
584 log.info("No Node found for {}", member);
585 continue;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800586 }
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800587 members.add(node);
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800588 }
589 return members;
590 }
591
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800592 private ControllerNode getNodeIdFromMember(Member member) {
593 if (member instanceof TcpMember) {
594 final TcpMember tcpMember = (TcpMember) member;
595 // TODO assuming tcpMember#host to be IP address,
596 // but if not lookup DNS, etc. first
597 IpAddress ip = IpAddress.valueOf(tcpMember.host());
598 int tcpPort = tcpMember.port();
599 for (ControllerNode node : clusterService.getNodes()) {
600 if (node.ip().equals(ip) &&
601 node.tcpPort() == tcpPort) {
602 return node;
603 }
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800604 }
605 }
606 return null;
607 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800608}