blob: 7467a4b8a3218f1673fbcccbd9bda393a4078f4b [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -08003import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
4import static org.onlab.util.Tools.namedThreads;
Madan Jampani08822c42014-11-04 17:17:46 -08005import static org.slf4j.LoggerFactory.getLogger;
6
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.io.File;
8import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08009import java.util.Collection;
10import java.util.Collections;
11import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080012import java.util.Map;
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -080013import java.util.Optional;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080014import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080015import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080016import java.util.concurrent.ExecutionException;
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -080017import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080018import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080019
20import net.kuujo.copycat.Copycat;
Yuta HIGUCHI4e450812014-11-23 01:53:28 -080021import net.kuujo.copycat.CopycatConfig;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080022import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080023import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080024import net.kuujo.copycat.cluster.TcpCluster;
25import net.kuujo.copycat.cluster.TcpClusterConfig;
26import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080027import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080028import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080029import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080030
Madan Jampani08822c42014-11-04 17:17:46 -080031import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080032import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080033import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080034import org.apache.felix.scr.annotations.Reference;
35import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080036import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080037import org.onlab.onos.cluster.ClusterEvent;
38import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080039import org.onlab.onos.cluster.ClusterService;
40import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080041import org.onlab.onos.cluster.DefaultControllerNode;
Madan Jampanidef2c652014-11-12 13:50:10 -080042import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani5ce30252014-11-17 20:53:17 -080043import org.onlab.onos.store.cluster.messaging.ClusterMessage;
44import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani12390c12014-11-12 00:35:56 -080045import org.onlab.onos.store.service.BatchReadRequest;
46import org.onlab.onos.store.service.BatchReadResult;
47import org.onlab.onos.store.service.BatchWriteRequest;
48import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080049import org.onlab.onos.store.service.DatabaseAdminService;
50import org.onlab.onos.store.service.DatabaseException;
51import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080052import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080053import org.onlab.onos.store.service.ReadStatus;
54import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080055import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080056import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080057import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080058import org.slf4j.Logger;
59
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080060import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080061
Madan Jampani08822c42014-11-04 17:17:46 -080062/**
63 * Strongly consistent and durable state management service based on
64 * Copycat implementation of Raft consensus protocol.
65 */
Brian O'Connor5d55ed42014-12-01 18:27:47 -080066@Component(immediate = false)
Madan Jampanidfbfa182014-11-04 22:06:41 -080067@Service
Madan Jampani08822c42014-11-04 17:17:46 -080068public class DatabaseManager implements DatabaseService, DatabaseAdminService {
69
Yuta HIGUCHI28052222014-11-20 16:45:32 -080070 private static final int RETRY_MS = 500;
71
72 private static final int ACTIVATE_MAX_RETRIES = 100;
73
Madan Jampani08822c42014-11-04 17:17:46 -080074 private final Logger log = getLogger(getClass());
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080077 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080078
Madan Jampani9b19a822014-11-04 21:37:13 -080079 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080080 protected ClusterCommunicationService clusterCommunicator;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080083 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080084
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -080085 public static final String LOG_FILE_PREFIX = "raft/onos-copy-cat-log_";
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080086
87 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080088 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080089 private static final String CONFIG_DIR = "../config";
90
91 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
92
93 private static final String DEFAULT_TABLET = "default";
94
95 // TODO: make this configurable
96 // initial member configuration file path
97 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080098
Madan Jampani5ce30252014-11-17 20:53:17 -080099 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
100 new MessageSubject("raft-leader-election-event");
101
Madan Jampani08822c42014-11-04 17:17:46 -0800102 private Copycat copycat;
103 private DatabaseClient client;
104
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800105 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800106 private ClusterConfig<TcpMember> clusterConfig;
107
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800108 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800109 private ClusterEventListener clusterEventListener;
110
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800111 private Map<String, Set<DefaultControllerNode>> tabletMembers;
112
113 private boolean autoAddMember = false;
114
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800115 private ScheduledExecutorService executor;
116
117 private volatile LeaderElectEvent myLeaderEvent = null;
118
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800119 // TODO make this configurable
Madan Jampani4bb70c62014-11-25 23:47:12 -0800120 private int maxLogSizeBytes = 128 * (1024 * 1024);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800121
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800122 // TODO make this configurable
123 private long electionTimeoutMs = 5000; // CopyCat default: 2000
124
Madan Jampani08822c42014-11-04 17:17:46 -0800125 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800126 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800127
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800128 // KARAF_DATA
129 // http://karaf.apache.org/manual/latest/users-guide/start-stop.html
130 final String dataDir = System.getProperty("karaf.data", "./data");
131
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800132 // load tablet configuration
133 File file = new File(CONFIG_DIR, initialMemberConfig);
134 log.info("Loading config: {}", file.getAbsolutePath());
135 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
136 try {
137 tabletMembers = tabletDef.read();
138 } catch (IOException e) {
139 log.error("Failed to load tablet config {}", file);
140 throw new IllegalStateException("Failed to load tablet config", e);
141 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800142
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800143 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800144 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800145 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
146 if (defaultMembers == null || defaultMembers.isEmpty()) {
147 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800148 DEFAULT_TABLET);
149 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800150
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800151 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800152
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800153 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800154 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800155 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
156 member.tcpPort());
157 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800158 clusterConfig.setLocalMember(tcpMember);
159 } else {
160 clusterConfig.addRemoteMember(tcpMember);
161 }
162 }
163
Madan Jampani5ce30252014-11-17 20:53:17 -0800164 if (clusterConfig.getLocalMember() != null) {
165
166 // Wait for a minimum viable Raft cluster to boot up.
167 waitForClusterQuorum();
168
169 final TcpCluster cluster;
170 synchronized (clusterConfig) {
171 // Create the cluster.
172 cluster = new TcpCluster(clusterConfig);
173 }
174 log.info("Starting cluster: {}", cluster);
175
176 DatabaseEntryExpirationTracker expirationTracker =
177 new DatabaseEntryExpirationTracker(
178 clusterConfig.getLocalMember(),
179 clusterService.getLocalNode(),
180 clusterCommunicator,
181 this);
182
183 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
184 stateMachine.addEventListener(expirationTracker);
Yuta HIGUCHI43e3a7e2014-11-30 23:22:11 -0800185 Log consensusLog = new MapDBLog(dataDir + "/" + LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800186 ClusterMessagingProtocol.DB_SERIALIZER);
Madan Jampani5ce30252014-11-17 20:53:17 -0800187
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800188 CopycatConfig ccConfig = new CopycatConfig();
189 ccConfig.setMaxLogSize(maxLogSizeBytes);
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800190 ccConfig.setElectionTimeout(electionTimeoutMs);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800191
192 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol, ccConfig);
Madan Jampani5ce30252014-11-17 20:53:17 -0800193 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
194 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
195 }
196
197 client = new DatabaseClient(copycatMessagingProtocol);
198 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
199
200 // Starts copycat if this node is a participant
201 // of the Raft cluster.
202 if (copycat != null) {
203 copycat.start().get();
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800204
205 executor =
206 newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
207 executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
208
Madan Jampani5ce30252014-11-17 20:53:17 -0800209 }
210
211 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800212
213 // Try and list the tables to verify database manager is
214 // in a state where it can serve requests.
215 tryTableListing();
216
Madan Jampani5ce30252014-11-17 20:53:17 -0800217 log.info("Started.");
218 }
219
220 @Deactivate
221 public void deactivate() {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800222 if (executor != null) {
223 executor.shutdownNow();
224 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800225 clusterService.removeListener(clusterEventListener);
226 // TODO: ClusterCommunicationService must support more than one
227 // handler per message subject.
228 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
229 if (copycat != null) {
230 copycat.stop();
231 }
232 log.info("Stopped.");
233 }
234
235 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800236 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800237 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800238 clusterEventListener = new InternalClusterEventListener();
239 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800240
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800241 final int raftClusterSize = clusterConfig.getMembers().size();
242 final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
243 if (clusterService.getNodes().size() < raftClusterQuorumSize) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800244 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800245 try {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800246 final int waitTimeSec = 120;
247 log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
248 if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800249 log.info("Starting with {}/{} nodes cluster",
250 clusterService.getNodes().size(),
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800251 raftClusterSize);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800252 }
253 } catch (InterruptedException e) {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800254 log.info("Interrupted waiting for raft quorum.", e);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800255 }
256 }
Madan Jampani08822c42014-11-04 17:17:46 -0800257 }
258
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800259 private void tryTableListing() throws InterruptedException {
Madan Jampani71582ed2014-11-18 10:06:01 -0800260 int retries = 0;
261 do {
262 try {
263 listTables();
264 return;
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800265 } catch (DatabaseException.Timeout e) {
266 log.debug("Failed to listTables. Will retry...", e);
Madan Jampani71582ed2014-11-18 10:06:01 -0800267 } catch (DatabaseException e) {
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800268 log.debug("Failed to listTables. Will retry later...", e);
269 Thread.sleep(RETRY_MS);
Yuta HIGUCHI01365622014-11-25 17:35:15 -0800270 }
271 if (retries == ACTIVATE_MAX_RETRIES) {
272 log.error("Failed to listTables after multiple attempts. Giving up.");
273 // Exiting hoping things will be fixed by the time
274 // others start using the service
275 return;
Madan Jampani71582ed2014-11-18 10:06:01 -0800276 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800277 retries++;
Madan Jampani71582ed2014-11-18 10:06:01 -0800278 } while (true);
279 }
280
Madan Jampani08822c42014-11-04 17:17:46 -0800281 @Override
282 public boolean createTable(String name) {
283 return client.createTable(name);
284 }
285
286 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800287 public boolean createTable(String name, int ttlMillis) {
288 return client.createTable(name, ttlMillis);
289 }
290
291 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800292 public void dropTable(String name) {
293 client.dropTable(name);
294 }
295
296 @Override
297 public void dropAllTables() {
298 client.dropAllTables();
299 }
300
301 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800302 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800303 return client.listTables();
304 }
305
306 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800307 public VersionedValue get(String tableName, String key) {
308 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
309 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
310 if (readResult.status().equals(ReadStatus.OK)) {
311 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800312 }
Madan Jampani12390c12014-11-12 00:35:56 -0800313 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800314 }
315
316 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800317 public Map<String, VersionedValue> getAll(String tableName) {
318 return client.getAll(tableName);
319 }
320
321
322 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800323 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
324 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800325 }
326
327 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800328 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
329 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800330 }
331
332 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800333 public VersionedValue put(String tableName, String key, byte[] value) {
334 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
335 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
336 if (writeResult.status().equals(WriteStatus.OK)) {
337 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800338 }
Madan Jampani12390c12014-11-12 00:35:56 -0800339 throw new DatabaseException("put failed due to status: " + writeResult.status());
340 }
Madan Jampani08822c42014-11-04 17:17:46 -0800341
Madan Jampani12390c12014-11-12 00:35:56 -0800342 @Override
343 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800344 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
345 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800346 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
347 if (writeResult.status().equals(WriteStatus.OK)) {
348 return true;
349 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
350 return false;
351 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800352 throw new DatabaseException("putIfAbsent failed due to status: "
353 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800354 }
355
356 @Override
357 public boolean putIfVersionMatches(String tableName, String key,
358 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800359 BatchWriteRequest batchRequest =
360 new BatchWriteRequest.Builder()
361 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800362 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
363 if (writeResult.status().equals(WriteStatus.OK)) {
364 return true;
365 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
366 return false;
367 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800368 throw new DatabaseException("putIfVersionMatches failed due to status: "
369 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800370 }
371
372 @Override
373 public boolean putIfValueMatches(String tableName, String key,
374 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800375 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
376 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800377 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
378 if (writeResult.status().equals(WriteStatus.OK)) {
379 return true;
380 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
381 return false;
382 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800383 throw new DatabaseException("putIfValueMatches failed due to status: "
384 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800385 }
386
387 @Override
388 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800389 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
390 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800391 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
392 if (writeResult.status().equals(WriteStatus.OK)) {
393 return writeResult.previousValue();
394 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800395 throw new DatabaseException("remove failed due to status: "
396 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800397 }
398
399 @Override
400 public boolean removeIfVersionMatches(String tableName, String key,
401 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800402 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
403 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800404 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
405 if (writeResult.status().equals(WriteStatus.OK)) {
406 return true;
407 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
408 return false;
409 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800410 throw new DatabaseException("removeIfVersionMatches failed due to status: "
411 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800412 }
413
414 @Override
415 public boolean removeIfValueMatches(String tableName, String key,
416 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800417 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
418 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800419 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
420 if (writeResult.status().equals(WriteStatus.OK)) {
421 return true;
422 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
423 return false;
424 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800425 throw new DatabaseException("removeIfValueMatches failed due to status: "
426 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800427 }
428
429 @Override
430 public void addMember(final ControllerNode node) {
431 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
432 node.tcpPort());
433 log.info("{} was added to the cluster", tcpMember);
434 synchronized (clusterConfig) {
435 clusterConfig.addRemoteMember(tcpMember);
436 }
Madan Jampani08822c42014-11-04 17:17:46 -0800437 }
438
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800439 @Override
440 public Optional<ControllerNode> leader() {
441 if (copycat != null) {
442 if (copycat.isLeader()) {
443 return Optional.of(clusterService.getLocalNode());
444 }
445 Member leader = copycat.cluster().remoteMember(copycat.leader());
446 return Optional.ofNullable(getNodeIdFromMember(leader));
447 }
448 return Optional.ofNullable(getNodeIdFromMember(client.getCurrentLeader()));
449 }
450
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800451 private final class LeaderAdvertiser implements Runnable {
452
Madan Jampani5ce30252014-11-17 20:53:17 -0800453 @Override
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800454 public void run() {
Madan Jampani5ce30252014-11-17 20:53:17 -0800455 try {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800456 LeaderElectEvent event = myLeaderEvent;
457 if (event != null) {
458 log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
Madan Jampani5ce30252014-11-17 20:53:17 -0800459 // This node just became the leader.
460 clusterCommunicator.broadcastIncludeSelf(
461 new ClusterMessage(
462 clusterService.getLocalNode().id(),
463 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800464 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Madan Jampani5ce30252014-11-17 20:53:17 -0800465 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800466 } catch (Exception e) {
467 log.debug("LeaderAdvertiser failed with exception", e);
468 }
469 }
470
471 }
472
473 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
474 @Override
475 public void handle(LeaderElectEvent event) {
476 try {
477 log.debug("Received LeaderElectEvent: {}", event);
478 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
479 log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
480 myLeaderEvent = event;
481 // This node just became the leader.
482 clusterCommunicator.broadcastIncludeSelf(
483 new ClusterMessage(
484 clusterService.getLocalNode().id(),
485 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800486 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800487 } else {
488 if (myLeaderEvent != null) {
489 log.debug("This node is no longer the Leader");
490 }
491 myLeaderEvent = null;
492 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800493 } catch (IOException e) {
494 log.error("Failed to broadcast raft leadership change event", e);
495 }
496 }
497 }
498
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800499 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800500 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800501
502 @Override
503 public void event(ClusterEvent event) {
504 // TODO: Not every node should be part of the consensus ring.
505
506 final ControllerNode node = event.subject();
507 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800508 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800509
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800510 switch (event.type()) {
511 case INSTANCE_ACTIVATED:
512 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800513 if (autoAddMember) {
514 synchronized (clusterConfig) {
515 if (!clusterConfig.getMembers().contains(tcpMember)) {
516 log.info("{} was automatically added to the cluster", tcpMember);
517 clusterConfig.addRemoteMember(tcpMember);
518 }
519 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800520 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800521 break;
522 case INSTANCE_DEACTIVATED:
523 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800524 if (autoAddMember) {
525 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800526 = tabletMembers.getOrDefault(DEFAULT_TABLET,
527 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800528 // remove only if not the initial members
529 if (!members.contains(node)) {
530 synchronized (clusterConfig) {
531 if (clusterConfig.getMembers().contains(tcpMember)) {
532 log.info("{} was automatically removed from the cluster", tcpMember);
533 clusterConfig.removeRemoteMember(tcpMember);
534 }
535 }
536 }
537 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800538 break;
539 default:
540 break;
541 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800542 if (copycat != null) {
543 log.debug("Current cluster: {}", copycat.cluster());
544 }
545 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800546 }
547
548 }
549
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800550 @Override
551 public void removeMember(final ControllerNode node) {
552 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
553 node.tcpPort());
554 log.info("{} was removed from the cluster", tcpMember);
555 synchronized (clusterConfig) {
556 clusterConfig.removeRemoteMember(tcpMember);
557 }
558 }
559
560 @Override
561 public Collection<ControllerNode> listMembers() {
562 if (copycat == null) {
563 return ImmutableList.of();
564 }
565 Set<ControllerNode> members = new HashSet<>();
566 for (Member member : copycat.cluster().members()) {
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800567 ControllerNode node = getNodeIdFromMember(member);
568 if (node == null) {
569 log.info("No Node found for {}", member);
570 continue;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800571 }
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800572 members.add(node);
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800573 }
574 return members;
575 }
576
Yuta HIGUCHIf0f2dfc2014-11-26 13:59:07 -0800577 private ControllerNode getNodeIdFromMember(Member member) {
578 if (member instanceof TcpMember) {
579 final TcpMember tcpMember = (TcpMember) member;
580 // TODO assuming tcpMember#host to be IP address,
581 // but if not lookup DNS, etc. first
582 IpAddress ip = IpAddress.valueOf(tcpMember.host());
583 int tcpPort = tcpMember.port();
584 for (ControllerNode node : clusterService.getNodes()) {
585 if (node.ip().equals(ip) &&
586 node.tcpPort() == tcpPort) {
587 return node;
588 }
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800589 }
590 }
591 return null;
592 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800593}