blob: 31f4bf0e72490757a78f4e1626b60254df72ec3a [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -08003import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
4import static org.onlab.util.Tools.namedThreads;
Madan Jampani08822c42014-11-04 17:17:46 -08005import static org.slf4j.LoggerFactory.getLogger;
6
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.io.File;
8import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08009import java.util.Collection;
10import java.util.Collections;
11import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080012import java.util.Map;
13import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080014import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080015import java.util.concurrent.ExecutionException;
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -080016import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080017import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080018
19import net.kuujo.copycat.Copycat;
Yuta HIGUCHI4e450812014-11-23 01:53:28 -080020import net.kuujo.copycat.CopycatConfig;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080021import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080022import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080023import net.kuujo.copycat.cluster.TcpCluster;
24import net.kuujo.copycat.cluster.TcpClusterConfig;
25import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080026import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080027import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080028import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080029
Madan Jampani08822c42014-11-04 17:17:46 -080030import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080031import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080032import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080033import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080035import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080036import org.onlab.onos.cluster.ClusterEvent;
37import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080038import org.onlab.onos.cluster.ClusterService;
39import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080040import org.onlab.onos.cluster.DefaultControllerNode;
41import org.onlab.onos.cluster.NodeId;
Madan Jampanidef2c652014-11-12 13:50:10 -080042import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani5ce30252014-11-17 20:53:17 -080043import org.onlab.onos.store.cluster.messaging.ClusterMessage;
44import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani12390c12014-11-12 00:35:56 -080045import org.onlab.onos.store.service.BatchReadRequest;
46import org.onlab.onos.store.service.BatchReadResult;
47import org.onlab.onos.store.service.BatchWriteRequest;
48import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080049import org.onlab.onos.store.service.DatabaseAdminService;
50import org.onlab.onos.store.service.DatabaseException;
51import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080052import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080053import org.onlab.onos.store.service.ReadStatus;
54import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080055import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080056import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080057import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080058import org.slf4j.Logger;
59
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080060import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080061
Madan Jampani08822c42014-11-04 17:17:46 -080062/**
63 * Strongly consistent and durable state management service based on
64 * Copycat implementation of Raft consensus protocol.
65 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080066@Component(immediate = true)
67@Service
Madan Jampani08822c42014-11-04 17:17:46 -080068public class DatabaseManager implements DatabaseService, DatabaseAdminService {
69
Yuta HIGUCHI28052222014-11-20 16:45:32 -080070 private static final int RETRY_MS = 500;
71
72 private static final int ACTIVATE_MAX_RETRIES = 100;
73
Madan Jampani08822c42014-11-04 17:17:46 -080074 private final Logger log = getLogger(getClass());
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080077 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080078
Madan Jampani9b19a822014-11-04 21:37:13 -080079 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080080 protected ClusterCommunicationService clusterCommunicator;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080083 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080084
Yuta HIGUCHI13a6f5a2014-11-12 10:07:47 -080085 // FIXME: point to appropriate path
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080086 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
87
88 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080089 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080090 private static final String CONFIG_DIR = "../config";
91
92 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
93
94 private static final String DEFAULT_TABLET = "default";
95
96 // TODO: make this configurable
97 // initial member configuration file path
98 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080099
Madan Jampani5ce30252014-11-17 20:53:17 -0800100 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
101 new MessageSubject("raft-leader-election-event");
102
Madan Jampani08822c42014-11-04 17:17:46 -0800103 private Copycat copycat;
104 private DatabaseClient client;
105
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800106 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800107 private ClusterConfig<TcpMember> clusterConfig;
108
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800109 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800110 private ClusterEventListener clusterEventListener;
111
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800112 private Map<String, Set<DefaultControllerNode>> tabletMembers;
113
114 private boolean autoAddMember = false;
115
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800116 private ScheduledExecutorService executor;
117
118 private volatile LeaderElectEvent myLeaderEvent = null;
119
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800120 // TODO make this configurable
Madan Jampani4bb70c62014-11-25 23:47:12 -0800121 private int maxLogSizeBytes = 128 * (1024 * 1024);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800122
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800123 // TODO make this configurable
124 private long electionTimeoutMs = 5000; // CopyCat default: 2000
125
Madan Jampani08822c42014-11-04 17:17:46 -0800126 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800127 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800128
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800129 // load tablet configuration
130 File file = new File(CONFIG_DIR, initialMemberConfig);
131 log.info("Loading config: {}", file.getAbsolutePath());
132 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
133 try {
134 tabletMembers = tabletDef.read();
135 } catch (IOException e) {
136 log.error("Failed to load tablet config {}", file);
137 throw new IllegalStateException("Failed to load tablet config", e);
138 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800139
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800140 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800141 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800142 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
143 if (defaultMembers == null || defaultMembers.isEmpty()) {
144 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800145 DEFAULT_TABLET);
146 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800147
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800148 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800149
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800150 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800151 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800152 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
153 member.tcpPort());
154 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800155 clusterConfig.setLocalMember(tcpMember);
156 } else {
157 clusterConfig.addRemoteMember(tcpMember);
158 }
159 }
160
Madan Jampani5ce30252014-11-17 20:53:17 -0800161 if (clusterConfig.getLocalMember() != null) {
162
163 // Wait for a minimum viable Raft cluster to boot up.
164 waitForClusterQuorum();
165
166 final TcpCluster cluster;
167 synchronized (clusterConfig) {
168 // Create the cluster.
169 cluster = new TcpCluster(clusterConfig);
170 }
171 log.info("Starting cluster: {}", cluster);
172
173 DatabaseEntryExpirationTracker expirationTracker =
174 new DatabaseEntryExpirationTracker(
175 clusterConfig.getLocalMember(),
176 clusterService.getLocalNode(),
177 clusterCommunicator,
178 this);
179
180 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
181 stateMachine.addEventListener(expirationTracker);
182 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800183 ClusterMessagingProtocol.DB_SERIALIZER);
Madan Jampani5ce30252014-11-17 20:53:17 -0800184
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800185 CopycatConfig ccConfig = new CopycatConfig();
186 ccConfig.setMaxLogSize(maxLogSizeBytes);
Yuta HIGUCHId88400b2014-11-25 12:13:30 -0800187 ccConfig.setElectionTimeout(electionTimeoutMs);
Yuta HIGUCHI4e450812014-11-23 01:53:28 -0800188
189 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol, ccConfig);
Madan Jampani5ce30252014-11-17 20:53:17 -0800190 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
191 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
192 }
193
194 client = new DatabaseClient(copycatMessagingProtocol);
195 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
196
197 // Starts copycat if this node is a participant
198 // of the Raft cluster.
199 if (copycat != null) {
200 copycat.start().get();
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800201
202 executor =
203 newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
204 executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
205
Madan Jampani5ce30252014-11-17 20:53:17 -0800206 }
207
208 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800209
210 // Try and list the tables to verify database manager is
211 // in a state where it can serve requests.
212 tryTableListing();
213
Madan Jampani5ce30252014-11-17 20:53:17 -0800214 log.info("Started.");
215 }
216
217 @Deactivate
218 public void deactivate() {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800219 if (executor != null) {
220 executor.shutdownNow();
221 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800222 clusterService.removeListener(clusterEventListener);
223 // TODO: ClusterCommunicationService must support more than one
224 // handler per message subject.
225 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
226 if (copycat != null) {
227 copycat.stop();
228 }
229 log.info("Stopped.");
230 }
231
232 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800233 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800234 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800235 clusterEventListener = new InternalClusterEventListener();
236 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800237
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800238 final int raftClusterSize = clusterConfig.getMembers().size();
239 final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
240 if (clusterService.getNodes().size() < raftClusterQuorumSize) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800241 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800242 try {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800243 final int waitTimeSec = 120;
244 log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
245 if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800246 log.info("Starting with {}/{} nodes cluster",
247 clusterService.getNodes().size(),
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800248 raftClusterSize);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800249 }
250 } catch (InterruptedException e) {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800251 log.info("Interrupted waiting for raft quorum.", e);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800252 }
253 }
Madan Jampani08822c42014-11-04 17:17:46 -0800254 }
255
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800256 private void tryTableListing() throws InterruptedException {
Madan Jampani71582ed2014-11-18 10:06:01 -0800257 int retries = 0;
258 do {
259 try {
260 listTables();
261 return;
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800262 } catch (DatabaseException.Timeout e) {
263 log.debug("Failed to listTables. Will retry...", e);
Madan Jampani71582ed2014-11-18 10:06:01 -0800264 } catch (DatabaseException e) {
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800265 log.debug("Failed to listTables. Will retry later...", e);
266 Thread.sleep(RETRY_MS);
Yuta HIGUCHI01365622014-11-25 17:35:15 -0800267 }
268 if (retries == ACTIVATE_MAX_RETRIES) {
269 log.error("Failed to listTables after multiple attempts. Giving up.");
270 // Exiting hoping things will be fixed by the time
271 // others start using the service
272 return;
Madan Jampani71582ed2014-11-18 10:06:01 -0800273 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800274 retries++;
Madan Jampani71582ed2014-11-18 10:06:01 -0800275 } while (true);
276 }
277
Madan Jampani08822c42014-11-04 17:17:46 -0800278 @Override
279 public boolean createTable(String name) {
280 return client.createTable(name);
281 }
282
283 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800284 public boolean createTable(String name, int ttlMillis) {
285 return client.createTable(name, ttlMillis);
286 }
287
288 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800289 public void dropTable(String name) {
290 client.dropTable(name);
291 }
292
293 @Override
294 public void dropAllTables() {
295 client.dropAllTables();
296 }
297
298 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800299 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800300 return client.listTables();
301 }
302
303 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800304 public VersionedValue get(String tableName, String key) {
305 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
306 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
307 if (readResult.status().equals(ReadStatus.OK)) {
308 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800309 }
Madan Jampani12390c12014-11-12 00:35:56 -0800310 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800311 }
312
313 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800314 public Map<String, VersionedValue> getAll(String tableName) {
315 return client.getAll(tableName);
316 }
317
318
319 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800320 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
321 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800322 }
323
324 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800325 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
326 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800327 }
328
329 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800330 public VersionedValue put(String tableName, String key, byte[] value) {
331 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
332 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
333 if (writeResult.status().equals(WriteStatus.OK)) {
334 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800335 }
Madan Jampani12390c12014-11-12 00:35:56 -0800336 throw new DatabaseException("put failed due to status: " + writeResult.status());
337 }
Madan Jampani08822c42014-11-04 17:17:46 -0800338
Madan Jampani12390c12014-11-12 00:35:56 -0800339 @Override
340 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800341 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
342 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800343 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
344 if (writeResult.status().equals(WriteStatus.OK)) {
345 return true;
346 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
347 return false;
348 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800349 throw new DatabaseException("putIfAbsent failed due to status: "
350 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800351 }
352
353 @Override
354 public boolean putIfVersionMatches(String tableName, String key,
355 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800356 BatchWriteRequest batchRequest =
357 new BatchWriteRequest.Builder()
358 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800359 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
360 if (writeResult.status().equals(WriteStatus.OK)) {
361 return true;
362 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
363 return false;
364 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800365 throw new DatabaseException("putIfVersionMatches failed due to status: "
366 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800367 }
368
369 @Override
370 public boolean putIfValueMatches(String tableName, String key,
371 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800372 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
373 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800374 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
375 if (writeResult.status().equals(WriteStatus.OK)) {
376 return true;
377 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
378 return false;
379 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800380 throw new DatabaseException("putIfValueMatches failed due to status: "
381 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800382 }
383
384 @Override
385 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800386 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
387 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800388 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
389 if (writeResult.status().equals(WriteStatus.OK)) {
390 return writeResult.previousValue();
391 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800392 throw new DatabaseException("remove failed due to status: "
393 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800394 }
395
396 @Override
397 public boolean removeIfVersionMatches(String tableName, String key,
398 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800399 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
400 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800401 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
402 if (writeResult.status().equals(WriteStatus.OK)) {
403 return true;
404 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
405 return false;
406 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800407 throw new DatabaseException("removeIfVersionMatches failed due to status: "
408 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800409 }
410
411 @Override
412 public boolean removeIfValueMatches(String tableName, String key,
413 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800414 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
415 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800416 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
417 if (writeResult.status().equals(WriteStatus.OK)) {
418 return true;
419 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
420 return false;
421 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800422 throw new DatabaseException("removeIfValueMatches failed due to status: "
423 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800424 }
425
426 @Override
427 public void addMember(final ControllerNode node) {
428 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
429 node.tcpPort());
430 log.info("{} was added to the cluster", tcpMember);
431 synchronized (clusterConfig) {
432 clusterConfig.addRemoteMember(tcpMember);
433 }
Madan Jampani08822c42014-11-04 17:17:46 -0800434 }
435
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800436 private final class LeaderAdvertiser implements Runnable {
437
Madan Jampani5ce30252014-11-17 20:53:17 -0800438 @Override
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800439 public void run() {
Madan Jampani5ce30252014-11-17 20:53:17 -0800440 try {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800441 LeaderElectEvent event = myLeaderEvent;
442 if (event != null) {
443 log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
Madan Jampani5ce30252014-11-17 20:53:17 -0800444 // This node just became the leader.
445 clusterCommunicator.broadcastIncludeSelf(
446 new ClusterMessage(
447 clusterService.getLocalNode().id(),
448 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800449 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Madan Jampani5ce30252014-11-17 20:53:17 -0800450 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800451 } catch (Exception e) {
452 log.debug("LeaderAdvertiser failed with exception", e);
453 }
454 }
455
456 }
457
458 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
459 @Override
460 public void handle(LeaderElectEvent event) {
461 try {
462 log.debug("Received LeaderElectEvent: {}", event);
463 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
464 log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
465 myLeaderEvent = event;
466 // This node just became the leader.
467 clusterCommunicator.broadcastIncludeSelf(
468 new ClusterMessage(
469 clusterService.getLocalNode().id(),
470 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800471 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800472 } else {
473 if (myLeaderEvent != null) {
474 log.debug("This node is no longer the Leader");
475 }
476 myLeaderEvent = null;
477 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800478 } catch (IOException e) {
479 log.error("Failed to broadcast raft leadership change event", e);
480 }
481 }
482 }
483
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800484 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800485 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800486
487 @Override
488 public void event(ClusterEvent event) {
489 // TODO: Not every node should be part of the consensus ring.
490
491 final ControllerNode node = event.subject();
492 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800493 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800494
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800495 switch (event.type()) {
496 case INSTANCE_ACTIVATED:
497 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800498 if (autoAddMember) {
499 synchronized (clusterConfig) {
500 if (!clusterConfig.getMembers().contains(tcpMember)) {
501 log.info("{} was automatically added to the cluster", tcpMember);
502 clusterConfig.addRemoteMember(tcpMember);
503 }
504 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800505 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800506 break;
507 case INSTANCE_DEACTIVATED:
508 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800509 if (autoAddMember) {
510 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800511 = tabletMembers.getOrDefault(DEFAULT_TABLET,
512 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800513 // remove only if not the initial members
514 if (!members.contains(node)) {
515 synchronized (clusterConfig) {
516 if (clusterConfig.getMembers().contains(tcpMember)) {
517 log.info("{} was automatically removed from the cluster", tcpMember);
518 clusterConfig.removeRemoteMember(tcpMember);
519 }
520 }
521 }
522 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800523 break;
524 default:
525 break;
526 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800527 if (copycat != null) {
528 log.debug("Current cluster: {}", copycat.cluster());
529 }
530 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800531 }
532
533 }
534
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800535 @Override
536 public void removeMember(final ControllerNode node) {
537 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
538 node.tcpPort());
539 log.info("{} was removed from the cluster", tcpMember);
540 synchronized (clusterConfig) {
541 clusterConfig.removeRemoteMember(tcpMember);
542 }
543 }
544
545 @Override
546 public Collection<ControllerNode> listMembers() {
547 if (copycat == null) {
548 return ImmutableList.of();
549 }
550 Set<ControllerNode> members = new HashSet<>();
551 for (Member member : copycat.cluster().members()) {
552 if (member instanceof TcpMember) {
553 final TcpMember tcpMember = (TcpMember) member;
554 // TODO assuming tcpMember#host to be IP address,
555 // but if not lookup DNS, etc. first
556 IpAddress ip = IpAddress.valueOf(tcpMember.host());
557 int tcpPort = tcpMember.port();
558 NodeId id = getNodeIdFromIp(ip, tcpPort);
559 if (id == null) {
560 log.info("No NodeId found for {}:{}", ip, tcpPort);
561 continue;
562 }
563 members.add(new DefaultControllerNode(id, ip, tcpPort));
564 }
565 }
566 return members;
567 }
568
569 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
570 for (ControllerNode node : clusterService.getNodes()) {
571 if (node.ip().equals(ip) &&
572 node.tcpPort() == tcpPort) {
573 return node.id();
574 }
575 }
576 return null;
577 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800578}