blob: e5a742edc3089668ac3957cf4d3a1fc9a9d74b3e [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani08822c42014-11-04 17:17:46 -080017
Jonathan Hart7d656f42015-01-27 14:07:23 -080018import com.google.common.collect.ImmutableList;
Madan Jampani08822c42014-11-04 17:17:46 -080019import net.kuujo.copycat.Copycat;
Yuta HIGUCHI4e450812014-11-23 01:53:28 -080020import net.kuujo.copycat.CopycatConfig;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080021import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080022import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080023import net.kuujo.copycat.cluster.TcpCluster;
24import net.kuujo.copycat.cluster.TcpClusterConfig;
25import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080026import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080027import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080028import net.kuujo.copycat.log.Log;
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080030import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080031import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080034import org.apache.felix.scr.annotations.Service;
Jonathan Hart7d656f42015-01-27 14:07:23 -080035import org.onlab.packet.IpAddress;
Brian O'Connorabafb502014-12-02 22:26:20 -080036import org.onosproject.cluster.ClusterEvent;
37import org.onosproject.cluster.ClusterEventListener;
38import org.onosproject.cluster.ClusterService;
39import org.onosproject.cluster.ControllerNode;
40import org.onosproject.cluster.DefaultControllerNode;
41import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.ClusterMessage;
43import org.onosproject.store.cluster.messaging.MessageSubject;
44import org.onosproject.store.service.BatchReadRequest;
45import org.onosproject.store.service.BatchReadResult;
46import org.onosproject.store.service.BatchWriteRequest;
47import org.onosproject.store.service.BatchWriteResult;
48import org.onosproject.store.service.DatabaseAdminService;
49import org.onosproject.store.service.DatabaseException;
50import org.onosproject.store.service.DatabaseService;
51import org.onosproject.store.service.ReadResult;
52import org.onosproject.store.service.ReadStatus;
53import org.onosproject.store.service.VersionedValue;
54import org.onosproject.store.service.WriteResult;
55import org.onosproject.store.service.WriteStatus;
Madan Jampani08822c42014-11-04 17:17:46 -080056import org.slf4j.Logger;
57
Jonathan Hart7d656f42015-01-27 14:07:23 -080058import java.io.File;
59import java.io.IOException;
60import java.util.Collection;
61import java.util.Collections;
62import java.util.HashSet;
63import java.util.Map;
64import java.util.Optional;
65import java.util.Set;
66import java.util.concurrent.CountDownLatch;
67import java.util.concurrent.ExecutionException;
68import java.util.concurrent.ScheduledExecutorService;
69import java.util.concurrent.TimeUnit;
70
71import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
72import static org.onlab.util.Tools.namedThreads;
73import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080074
Madan Jampani08822c42014-11-04 17:17:46 -080075/**
76 * Strongly consistent and durable state management service based on
77 * Copycat implementation of Raft consensus protocol.
78 */
Brian O'Connor5d55ed42014-12-01 18:27:47 -080079@Component(immediate = false)
Madan Jampanidfbfa182014-11-04 22:06:41 -080080@Service
Madan Jampani08822c42014-11-04 17:17:46 -080081public class DatabaseManager implements DatabaseService, DatabaseAdminService {
82
Yuta HIGUCHI28052222014-11-20 16:45:32 -080083 private static final int RETRY_MS = 500;
84
85 private static final int ACTIVATE_MAX_RETRIES = 100;
86
Madan Jampani08822c42014-11-04 17:17:46 -080087 private final Logger log = getLogger(getClass());
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080090 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080091
Madan Jampani9b19a822014-11-04 21:37:13 -080092 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080093 protected ClusterCommunicationService clusterCommunicator;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080096 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080097
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -080098 public static final String LOG_FILE_PREFIX = "raft/onos-copy-cat-log_";
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080099
100 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -0800101 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800102 private static final String CONFIG_DIR = "../config";
103
104 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
105
106 private static final String DEFAULT_TABLET = "default";
107
108 // TODO: make this configurable
109 // initial member configuration file path
110 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -0800111
Madan Jampani5ce30252014-11-17 20:53:17 -0800112 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
113 new MessageSubject("raft-leader-election-event");
114
Madan Jampani08822c42014-11-04 17:17:46 -0800115 private Copycat copycat;
116 private DatabaseClient client;
117
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800118 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800119 private ClusterConfig<TcpMember> clusterConfig;
120
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800121 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800122 private ClusterEventListener clusterEventListener;
123
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800124 private Map<String, Set<DefaultControllerNode>> tabletMembers;
125
126 private boolean autoAddMember = false;
127
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800128 private ScheduledExecutorService executor;
129
130 private volatile LeaderElectEvent myLeaderEvent = null;
131
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800132 // TODO make this configurable
Madan Jampani4bb70c62014-11-25 23:47:12 -0800133 private int maxLogSizeBytes = 128 * (1024 * 1024);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800134
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800135 // TODO make this configurable
136 private long electionTimeoutMs = 5000; // CopyCat default: 2000
137
Madan Jampani08822c42014-11-04 17:17:46 -0800138 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800139 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800140
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800141 // KARAF_DATA
142 // http://karaf.apache.org/manual/latest/users-guide/start-stop.html
143 final String dataDir = System.getProperty("karaf.data", "./data");
144
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800145 // load tablet configuration
146 File file = new File(CONFIG_DIR, initialMemberConfig);
147 log.info("Loading config: {}", file.getAbsolutePath());
148 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
149 try {
150 tabletMembers = tabletDef.read();
151 } catch (IOException e) {
152 log.error("Failed to load tablet config {}", file);
153 throw new IllegalStateException("Failed to load tablet config", e);
154 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800155
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800156 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800157 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800158 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
159 if (defaultMembers == null || defaultMembers.isEmpty()) {
160 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800161 DEFAULT_TABLET);
162 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800163
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800164 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800165
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800166 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800167 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800168 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
169 member.tcpPort());
170 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800171 clusterConfig.setLocalMember(tcpMember);
172 } else {
173 clusterConfig.addRemoteMember(tcpMember);
174 }
175 }
176
Madan Jampani5ce30252014-11-17 20:53:17 -0800177 if (clusterConfig.getLocalMember() != null) {
178
179 // Wait for a minimum viable Raft cluster to boot up.
180 waitForClusterQuorum();
181
182 final TcpCluster cluster;
183 synchronized (clusterConfig) {
184 // Create the cluster.
185 cluster = new TcpCluster(clusterConfig);
186 }
187 log.info("Starting cluster: {}", cluster);
188
189 DatabaseEntryExpirationTracker expirationTracker =
190 new DatabaseEntryExpirationTracker(
191 clusterConfig.getLocalMember(),
192 clusterService.getLocalNode(),
193 clusterCommunicator,
194 this);
195
196 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
197 stateMachine.addEventListener(expirationTracker);
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800198 Log consensusLog = new MapDBLog(dataDir + "/" + LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800199 ClusterMessagingProtocol.DB_SERIALIZER);
Madan Jampani5ce30252014-11-17 20:53:17 -0800200
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800201 CopycatConfig ccConfig = new CopycatConfig();
202 ccConfig.setMaxLogSize(maxLogSizeBytes);
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800203 ccConfig.setElectionTimeout(electionTimeoutMs);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800204
205 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol, ccConfig);
Madan Jampani5ce30252014-11-17 20:53:17 -0800206 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
207 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
208 }
209
210 client = new DatabaseClient(copycatMessagingProtocol);
211 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
212
213 // Starts copycat if this node is a participant
214 // of the Raft cluster.
215 if (copycat != null) {
216 copycat.start().get();
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800217
218 executor =
Thomas Vachuska9ea3e6f2015-01-23 16:34:22 -0800219 newSingleThreadScheduledExecutor(namedThreads("onos-db-heartbeat-%d"));
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800220 executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
221
Madan Jampani5ce30252014-11-17 20:53:17 -0800222 }
223
224 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800225
226 // Try and list the tables to verify database manager is
227 // in a state where it can serve requests.
228 tryTableListing();
229
Madan Jampani5ce30252014-11-17 20:53:17 -0800230 log.info("Started.");
231 }
232
233 @Deactivate
234 public void deactivate() {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800235 if (executor != null) {
236 executor.shutdownNow();
237 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800238 clusterService.removeListener(clusterEventListener);
239 // TODO: ClusterCommunicationService must support more than one
240 // handler per message subject.
241 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
242 if (copycat != null) {
243 copycat.stop();
244 }
245 log.info("Stopped.");
246 }
247
248 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800249 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800250 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800251 clusterEventListener = new InternalClusterEventListener();
252 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800253
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800254 final int raftClusterSize = clusterConfig.getMembers().size();
255 final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
256 if (clusterService.getNodes().size() < raftClusterQuorumSize) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800257 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800258 try {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800259 final int waitTimeSec = 120;
260 log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
261 if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800262 log.info("Starting with {}/{} nodes cluster",
263 clusterService.getNodes().size(),
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800264 raftClusterSize);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800265 }
266 } catch (InterruptedException e) {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800267 log.info("Interrupted waiting for raft quorum.", e);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800268 }
269 }
Madan Jampani08822c42014-11-04 17:17:46 -0800270 }
271
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800272 private void tryTableListing() throws InterruptedException {
Madan Jampani71582ed2014-11-18 10:06:01 -0800273 int retries = 0;
274 do {
275 try {
276 listTables();
277 return;
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800278 } catch (DatabaseException.Timeout e) {
279 log.debug("Failed to listTables. Will retry...", e);
Madan Jampani71582ed2014-11-18 10:06:01 -0800280 } catch (DatabaseException e) {
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800281 log.debug("Failed to listTables. Will retry later...", e);
282 Thread.sleep(RETRY_MS);
Yuta HIGUCHI01365622014-11-25 17:35:15 -0800283 }
284 if (retries == ACTIVATE_MAX_RETRIES) {
285 log.error("Failed to listTables after multiple attempts. Giving up.");
286 // Exiting hoping things will be fixed by the time
287 // others start using the service
288 return;
Madan Jampani71582ed2014-11-18 10:06:01 -0800289 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800290 retries++;
Madan Jampani71582ed2014-11-18 10:06:01 -0800291 } while (true);
292 }
293
Madan Jampani08822c42014-11-04 17:17:46 -0800294 @Override
295 public boolean createTable(String name) {
296 return client.createTable(name);
297 }
298
299 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800300 public boolean createTable(String name, int ttlMillis) {
301 return client.createTable(name, ttlMillis);
302 }
303
304 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800305 public void dropTable(String name) {
306 client.dropTable(name);
307 }
308
309 @Override
310 public void dropAllTables() {
311 client.dropAllTables();
312 }
313
314 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800315 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800316 return client.listTables();
317 }
318
319 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800320 public VersionedValue get(String tableName, String key) {
321 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
322 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
323 if (readResult.status().equals(ReadStatus.OK)) {
324 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800325 }
Madan Jampani12390c12014-11-12 00:35:56 -0800326 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800327 }
328
329 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800330 public Map<String, VersionedValue> getAll(String tableName) {
331 return client.getAll(tableName);
332 }
333
334
335 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800336 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
337 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800338 }
339
340 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800341 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
342 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800343 }
344
345 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800346 public VersionedValue put(String tableName, String key, byte[] value) {
347 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
348 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
349 if (writeResult.status().equals(WriteStatus.OK)) {
350 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800351 }
Madan Jampani12390c12014-11-12 00:35:56 -0800352 throw new DatabaseException("put failed due to status: " + writeResult.status());
353 }
Madan Jampani08822c42014-11-04 17:17:46 -0800354
Madan Jampani12390c12014-11-12 00:35:56 -0800355 @Override
356 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800357 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
358 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800359 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
360 if (writeResult.status().equals(WriteStatus.OK)) {
361 return true;
362 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
363 return false;
364 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800365 throw new DatabaseException("putIfAbsent failed due to status: "
366 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800367 }
368
369 @Override
370 public boolean putIfVersionMatches(String tableName, String key,
371 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800372 BatchWriteRequest batchRequest =
373 new BatchWriteRequest.Builder()
374 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800375 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
376 if (writeResult.status().equals(WriteStatus.OK)) {
377 return true;
378 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
379 return false;
380 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800381 throw new DatabaseException("putIfVersionMatches failed due to status: "
382 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800383 }
384
385 @Override
386 public boolean putIfValueMatches(String tableName, String key,
387 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800388 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
389 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800390 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
391 if (writeResult.status().equals(WriteStatus.OK)) {
392 return true;
393 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
394 return false;
395 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800396 throw new DatabaseException("putIfValueMatches failed due to status: "
397 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800398 }
399
400 @Override
401 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800402 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
403 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800404 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
405 if (writeResult.status().equals(WriteStatus.OK)) {
406 return writeResult.previousValue();
407 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800408 throw new DatabaseException("remove failed due to status: "
409 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800410 }
411
412 @Override
413 public boolean removeIfVersionMatches(String tableName, String key,
414 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800415 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
416 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800417 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
418 if (writeResult.status().equals(WriteStatus.OK)) {
419 return true;
420 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
421 return false;
422 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800423 throw new DatabaseException("removeIfVersionMatches failed due to status: "
424 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800425 }
426
427 @Override
428 public boolean removeIfValueMatches(String tableName, String key,
429 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800430 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
431 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800432 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
433 if (writeResult.status().equals(WriteStatus.OK)) {
434 return true;
435 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
436 return false;
437 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800438 throw new DatabaseException("removeIfValueMatches failed due to status: "
439 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800440 }
441
442 @Override
443 public void addMember(final ControllerNode node) {
444 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
445 node.tcpPort());
446 log.info("{} was added to the cluster", tcpMember);
447 synchronized (clusterConfig) {
448 clusterConfig.addRemoteMember(tcpMember);
449 }
Madan Jampani08822c42014-11-04 17:17:46 -0800450 }
451
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800452 @Override
453 public Optional<ControllerNode> leader() {
454 if (copycat != null) {
455 if (copycat.isLeader()) {
456 return Optional.of(clusterService.getLocalNode());
457 }
458 Member leader = copycat.cluster().remoteMember(copycat.leader());
459 return Optional.ofNullable(getNodeIdFromMember(leader));
460 }
461 return Optional.ofNullable(getNodeIdFromMember(client.getCurrentLeader()));
462 }
463
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800464 private final class LeaderAdvertiser implements Runnable {
465
Madan Jampani5ce30252014-11-17 20:53:17 -0800466 @Override
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800467 public void run() {
Madan Jampani5ce30252014-11-17 20:53:17 -0800468 try {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800469 LeaderElectEvent event = myLeaderEvent;
470 if (event != null) {
471 log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
Madan Jampani5ce30252014-11-17 20:53:17 -0800472 // This node just became the leader.
473 clusterCommunicator.broadcastIncludeSelf(
474 new ClusterMessage(
475 clusterService.getLocalNode().id(),
476 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800477 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Madan Jampani5ce30252014-11-17 20:53:17 -0800478 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800479 } catch (Exception e) {
480 log.debug("LeaderAdvertiser failed with exception", e);
481 }
482 }
483
484 }
485
486 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
487 @Override
488 public void handle(LeaderElectEvent event) {
Jonathan Hart7d656f42015-01-27 14:07:23 -0800489 log.debug("Received LeaderElectEvent: {}", event);
490 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
491 log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
492 myLeaderEvent = event;
493 // This node just became the leader.
494 clusterCommunicator.broadcastIncludeSelf(
495 new ClusterMessage(
496 clusterService.getLocalNode().id(),
497 RAFT_LEADER_ELECTION_EVENT,
498 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
499 } else {
500 if (myLeaderEvent != null) {
501 log.debug("This node is no longer the Leader");
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800502 }
Jonathan Hart7d656f42015-01-27 14:07:23 -0800503 myLeaderEvent = null;
Madan Jampani5ce30252014-11-17 20:53:17 -0800504 }
505 }
506 }
507
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800508 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800509 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800510
511 @Override
512 public void event(ClusterEvent event) {
513 // TODO: Not every node should be part of the consensus ring.
514
515 final ControllerNode node = event.subject();
516 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800517 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800518
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800519 switch (event.type()) {
520 case INSTANCE_ACTIVATED:
521 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800522 if (autoAddMember) {
523 synchronized (clusterConfig) {
524 if (!clusterConfig.getMembers().contains(tcpMember)) {
525 log.info("{} was automatically added to the cluster", tcpMember);
526 clusterConfig.addRemoteMember(tcpMember);
527 }
528 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800529 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800530 break;
531 case INSTANCE_DEACTIVATED:
532 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800533 if (autoAddMember) {
534 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800535 = tabletMembers.getOrDefault(DEFAULT_TABLET,
536 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800537 // remove only if not the initial members
538 if (!members.contains(node)) {
539 synchronized (clusterConfig) {
540 if (clusterConfig.getMembers().contains(tcpMember)) {
541 log.info("{} was automatically removed from the cluster", tcpMember);
542 clusterConfig.removeRemoteMember(tcpMember);
543 }
544 }
545 }
546 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800547 break;
548 default:
549 break;
550 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800551 if (copycat != null) {
552 log.debug("Current cluster: {}", copycat.cluster());
553 }
554 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800555 }
556
557 }
558
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800559 @Override
560 public void removeMember(final ControllerNode node) {
561 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
562 node.tcpPort());
563 log.info("{} was removed from the cluster", tcpMember);
564 synchronized (clusterConfig) {
565 clusterConfig.removeRemoteMember(tcpMember);
566 }
567 }
568
569 @Override
570 public Collection<ControllerNode> listMembers() {
571 if (copycat == null) {
572 return ImmutableList.of();
573 }
574 Set<ControllerNode> members = new HashSet<>();
575 for (Member member : copycat.cluster().members()) {
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800576 ControllerNode node = getNodeIdFromMember(member);
577 if (node == null) {
578 log.info("No Node found for {}", member);
579 continue;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800580 }
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800581 members.add(node);
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800582 }
583 return members;
584 }
585
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800586 private ControllerNode getNodeIdFromMember(Member member) {
587 if (member instanceof TcpMember) {
588 final TcpMember tcpMember = (TcpMember) member;
589 // TODO assuming tcpMember#host to be IP address,
590 // but if not lookup DNS, etc. first
591 IpAddress ip = IpAddress.valueOf(tcpMember.host());
592 int tcpPort = tcpMember.port();
593 for (ControllerNode node : clusterService.getNodes()) {
594 if (node.ip().equals(ip) &&
595 node.tcpPort() == tcpPort) {
596 return node;
597 }
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800598 }
599 }
600 return null;
601 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800602}