blob: 35b6c98f131bac52507f99b098cb50c935db691e [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -08003import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
4import static org.onlab.util.Tools.namedThreads;
Madan Jampani08822c42014-11-04 17:17:46 -08005import static org.slf4j.LoggerFactory.getLogger;
6
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.io.File;
8import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08009import java.util.Collection;
10import java.util.Collections;
11import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080012import java.util.Map;
13import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080014import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080015import java.util.concurrent.ExecutionException;
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -080016import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080017import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080018
19import net.kuujo.copycat.Copycat;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080020import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080021import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080022import net.kuujo.copycat.cluster.TcpCluster;
23import net.kuujo.copycat.cluster.TcpClusterConfig;
24import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080025import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080026import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080027import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080028
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080030import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080031import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080034import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080035import org.onlab.onos.cluster.ClusterEvent;
36import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080037import org.onlab.onos.cluster.ClusterService;
38import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080039import org.onlab.onos.cluster.DefaultControllerNode;
40import org.onlab.onos.cluster.NodeId;
Madan Jampanidef2c652014-11-12 13:50:10 -080041import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani5ce30252014-11-17 20:53:17 -080042import org.onlab.onos.store.cluster.messaging.ClusterMessage;
43import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani12390c12014-11-12 00:35:56 -080044import org.onlab.onos.store.service.BatchReadRequest;
45import org.onlab.onos.store.service.BatchReadResult;
46import org.onlab.onos.store.service.BatchWriteRequest;
47import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080048import org.onlab.onos.store.service.DatabaseAdminService;
49import org.onlab.onos.store.service.DatabaseException;
50import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080051import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080052import org.onlab.onos.store.service.ReadStatus;
53import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080054import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080055import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080056import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080057import org.slf4j.Logger;
58
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080059import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080060
Madan Jampani08822c42014-11-04 17:17:46 -080061/**
62 * Strongly consistent and durable state management service based on
63 * Copycat implementation of Raft consensus protocol.
64 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080065@Component(immediate = true)
66@Service
Madan Jampani08822c42014-11-04 17:17:46 -080067public class DatabaseManager implements DatabaseService, DatabaseAdminService {
68
Yuta HIGUCHI28052222014-11-20 16:45:32 -080069 private static final int RETRY_MS = 500;
70
71 private static final int ACTIVATE_MAX_RETRIES = 100;
72
Madan Jampani08822c42014-11-04 17:17:46 -080073 private final Logger log = getLogger(getClass());
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080076 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080077
Madan Jampani9b19a822014-11-04 21:37:13 -080078 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080079 protected ClusterCommunicationService clusterCommunicator;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080082 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080083
Yuta HIGUCHI13a6f5a2014-11-12 10:07:47 -080084 // FIXME: point to appropriate path
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080085 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
86
87 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080088 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080089 private static final String CONFIG_DIR = "../config";
90
91 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
92
93 private static final String DEFAULT_TABLET = "default";
94
95 // TODO: make this configurable
96 // initial member configuration file path
97 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080098
Madan Jampani5ce30252014-11-17 20:53:17 -080099 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
100 new MessageSubject("raft-leader-election-event");
101
Madan Jampani08822c42014-11-04 17:17:46 -0800102 private Copycat copycat;
103 private DatabaseClient client;
104
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800105 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800106 private ClusterConfig<TcpMember> clusterConfig;
107
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800108 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800109 private ClusterEventListener clusterEventListener;
110
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800111 private Map<String, Set<DefaultControllerNode>> tabletMembers;
112
113 private boolean autoAddMember = false;
114
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800115 private ScheduledExecutorService executor;
116
117 private volatile LeaderElectEvent myLeaderEvent = null;
118
Madan Jampani08822c42014-11-04 17:17:46 -0800119 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800120 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800121
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800122 // load tablet configuration
123 File file = new File(CONFIG_DIR, initialMemberConfig);
124 log.info("Loading config: {}", file.getAbsolutePath());
125 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
126 try {
127 tabletMembers = tabletDef.read();
128 } catch (IOException e) {
129 log.error("Failed to load tablet config {}", file);
130 throw new IllegalStateException("Failed to load tablet config", e);
131 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800132
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800133 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800134 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800135 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
136 if (defaultMembers == null || defaultMembers.isEmpty()) {
137 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800138 DEFAULT_TABLET);
139 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800140
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800141 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800142
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800143 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800144 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800145 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
146 member.tcpPort());
147 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800148 clusterConfig.setLocalMember(tcpMember);
149 } else {
150 clusterConfig.addRemoteMember(tcpMember);
151 }
152 }
153
Madan Jampani5ce30252014-11-17 20:53:17 -0800154 if (clusterConfig.getLocalMember() != null) {
155
156 // Wait for a minimum viable Raft cluster to boot up.
157 waitForClusterQuorum();
158
159 final TcpCluster cluster;
160 synchronized (clusterConfig) {
161 // Create the cluster.
162 cluster = new TcpCluster(clusterConfig);
163 }
164 log.info("Starting cluster: {}", cluster);
165
166 DatabaseEntryExpirationTracker expirationTracker =
167 new DatabaseEntryExpirationTracker(
168 clusterConfig.getLocalMember(),
169 clusterService.getLocalNode(),
170 clusterCommunicator,
171 this);
172
173 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
174 stateMachine.addEventListener(expirationTracker);
175 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800176 ClusterMessagingProtocol.DB_SERIALIZER);
Madan Jampani5ce30252014-11-17 20:53:17 -0800177
178 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
179 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
180 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
181 }
182
183 client = new DatabaseClient(copycatMessagingProtocol);
184 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
185
186 // Starts copycat if this node is a participant
187 // of the Raft cluster.
188 if (copycat != null) {
189 copycat.start().get();
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800190
191 executor =
192 newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
193 executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
194
Madan Jampani5ce30252014-11-17 20:53:17 -0800195 }
196
197 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800198
199 // Try and list the tables to verify database manager is
200 // in a state where it can serve requests.
201 tryTableListing();
202
Madan Jampani5ce30252014-11-17 20:53:17 -0800203 log.info("Started.");
204 }
205
206 @Deactivate
207 public void deactivate() {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800208 if (executor != null) {
209 executor.shutdownNow();
210 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800211 clusterService.removeListener(clusterEventListener);
212 // TODO: ClusterCommunicationService must support more than one
213 // handler per message subject.
214 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
215 if (copycat != null) {
216 copycat.stop();
217 }
218 log.info("Stopped.");
219 }
220
221 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800222 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800223 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800224 clusterEventListener = new InternalClusterEventListener();
225 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800226
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800227 final int raftClusterSize = clusterConfig.getMembers().size();
228 final int raftClusterQuorumSize = (int) (Math.floor(raftClusterSize / 2)) + 1;
229 if (clusterService.getNodes().size() < raftClusterQuorumSize) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800230 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800231 try {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800232 final int waitTimeSec = 120;
233 log.info("Waiting for a maximum of {}s for raft cluster quorum to boot up...", waitTimeSec);
234 if (!clusterEventLatch.await(waitTimeSec, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800235 log.info("Starting with {}/{} nodes cluster",
236 clusterService.getNodes().size(),
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800237 raftClusterSize);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800238 }
239 } catch (InterruptedException e) {
Pavlin Radoslavovaf529d82014-11-22 21:03:19 -0800240 log.info("Interrupted waiting for raft quorum.", e);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800241 }
242 }
Madan Jampani08822c42014-11-04 17:17:46 -0800243 }
244
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800245 private void tryTableListing() throws InterruptedException {
Madan Jampani71582ed2014-11-18 10:06:01 -0800246 int retries = 0;
247 do {
248 try {
249 listTables();
250 return;
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800251 } catch (DatabaseException.Timeout e) {
252 log.debug("Failed to listTables. Will retry...", e);
Madan Jampani71582ed2014-11-18 10:06:01 -0800253 } catch (DatabaseException e) {
Yuta HIGUCHI28052222014-11-20 16:45:32 -0800254 log.debug("Failed to listTables. Will retry later...", e);
255 Thread.sleep(RETRY_MS);
256 } finally {
257 if (retries == ACTIVATE_MAX_RETRIES) {
258 log.error("Failed to listTables after multiple attempts. Giving up.");
259 // Exiting hoping things will be fixed by the time
260 // others start using the service
261 return;
Madan Jampani71582ed2014-11-18 10:06:01 -0800262 }
263 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800264 retries++;
Madan Jampani71582ed2014-11-18 10:06:01 -0800265 } while (true);
266 }
267
Madan Jampani08822c42014-11-04 17:17:46 -0800268 @Override
269 public boolean createTable(String name) {
270 return client.createTable(name);
271 }
272
273 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800274 public boolean createTable(String name, int ttlMillis) {
275 return client.createTable(name, ttlMillis);
276 }
277
278 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800279 public void dropTable(String name) {
280 client.dropTable(name);
281 }
282
283 @Override
284 public void dropAllTables() {
285 client.dropAllTables();
286 }
287
288 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800289 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800290 return client.listTables();
291 }
292
293 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800294 public VersionedValue get(String tableName, String key) {
295 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
296 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
297 if (readResult.status().equals(ReadStatus.OK)) {
298 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800299 }
Madan Jampani12390c12014-11-12 00:35:56 -0800300 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800301 }
302
303 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800304 public Map<String, VersionedValue> getAll(String tableName) {
305 return client.getAll(tableName);
306 }
307
308
309 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800310 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
311 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800312 }
313
314 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800315 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
316 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800317 }
318
319 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800320 public VersionedValue put(String tableName, String key, byte[] value) {
321 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
322 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
323 if (writeResult.status().equals(WriteStatus.OK)) {
324 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800325 }
Madan Jampani12390c12014-11-12 00:35:56 -0800326 throw new DatabaseException("put failed due to status: " + writeResult.status());
327 }
Madan Jampani08822c42014-11-04 17:17:46 -0800328
Madan Jampani12390c12014-11-12 00:35:56 -0800329 @Override
330 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800331 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
332 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800333 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
334 if (writeResult.status().equals(WriteStatus.OK)) {
335 return true;
336 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
337 return false;
338 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800339 throw new DatabaseException("putIfAbsent failed due to status: "
340 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800341 }
342
343 @Override
344 public boolean putIfVersionMatches(String tableName, String key,
345 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800346 BatchWriteRequest batchRequest =
347 new BatchWriteRequest.Builder()
348 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800349 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
350 if (writeResult.status().equals(WriteStatus.OK)) {
351 return true;
352 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
353 return false;
354 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800355 throw new DatabaseException("putIfVersionMatches failed due to status: "
356 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800357 }
358
359 @Override
360 public boolean putIfValueMatches(String tableName, String key,
361 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800362 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
363 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800364 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
365 if (writeResult.status().equals(WriteStatus.OK)) {
366 return true;
367 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
368 return false;
369 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800370 throw new DatabaseException("putIfValueMatches failed due to status: "
371 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800372 }
373
374 @Override
375 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800376 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
377 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800378 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
379 if (writeResult.status().equals(WriteStatus.OK)) {
380 return writeResult.previousValue();
381 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800382 throw new DatabaseException("remove failed due to status: "
383 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800384 }
385
386 @Override
387 public boolean removeIfVersionMatches(String tableName, String key,
388 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800389 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
390 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800391 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
392 if (writeResult.status().equals(WriteStatus.OK)) {
393 return true;
394 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
395 return false;
396 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800397 throw new DatabaseException("removeIfVersionMatches failed due to status: "
398 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800399 }
400
401 @Override
402 public boolean removeIfValueMatches(String tableName, String key,
403 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800404 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
405 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800406 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
407 if (writeResult.status().equals(WriteStatus.OK)) {
408 return true;
409 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
410 return false;
411 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800412 throw new DatabaseException("removeIfValueMatches failed due to status: "
413 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800414 }
415
416 @Override
417 public void addMember(final ControllerNode node) {
418 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
419 node.tcpPort());
420 log.info("{} was added to the cluster", tcpMember);
421 synchronized (clusterConfig) {
422 clusterConfig.addRemoteMember(tcpMember);
423 }
Madan Jampani08822c42014-11-04 17:17:46 -0800424 }
425
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800426 private final class LeaderAdvertiser implements Runnable {
427
Madan Jampani5ce30252014-11-17 20:53:17 -0800428 @Override
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800429 public void run() {
Madan Jampani5ce30252014-11-17 20:53:17 -0800430 try {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800431 LeaderElectEvent event = myLeaderEvent;
432 if (event != null) {
433 log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
Madan Jampani5ce30252014-11-17 20:53:17 -0800434 // This node just became the leader.
435 clusterCommunicator.broadcastIncludeSelf(
436 new ClusterMessage(
437 clusterService.getLocalNode().id(),
438 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800439 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Madan Jampani5ce30252014-11-17 20:53:17 -0800440 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800441 } catch (Exception e) {
442 log.debug("LeaderAdvertiser failed with exception", e);
443 }
444 }
445
446 }
447
448 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
449 @Override
450 public void handle(LeaderElectEvent event) {
451 try {
452 log.debug("Received LeaderElectEvent: {}", event);
453 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
454 log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
455 myLeaderEvent = event;
456 // This node just became the leader.
457 clusterCommunicator.broadcastIncludeSelf(
458 new ClusterMessage(
459 clusterService.getLocalNode().id(),
460 RAFT_LEADER_ELECTION_EVENT,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800461 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800462 } else {
463 if (myLeaderEvent != null) {
464 log.debug("This node is no longer the Leader");
465 }
466 myLeaderEvent = null;
467 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800468 } catch (IOException e) {
469 log.error("Failed to broadcast raft leadership change event", e);
470 }
471 }
472 }
473
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800474 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800475 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800476
477 @Override
478 public void event(ClusterEvent event) {
479 // TODO: Not every node should be part of the consensus ring.
480
481 final ControllerNode node = event.subject();
482 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800483 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800484
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800485 switch (event.type()) {
486 case INSTANCE_ACTIVATED:
487 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800488 if (autoAddMember) {
489 synchronized (clusterConfig) {
490 if (!clusterConfig.getMembers().contains(tcpMember)) {
491 log.info("{} was automatically added to the cluster", tcpMember);
492 clusterConfig.addRemoteMember(tcpMember);
493 }
494 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800495 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800496 break;
497 case INSTANCE_DEACTIVATED:
498 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800499 if (autoAddMember) {
500 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800501 = tabletMembers.getOrDefault(DEFAULT_TABLET,
502 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800503 // remove only if not the initial members
504 if (!members.contains(node)) {
505 synchronized (clusterConfig) {
506 if (clusterConfig.getMembers().contains(tcpMember)) {
507 log.info("{} was automatically removed from the cluster", tcpMember);
508 clusterConfig.removeRemoteMember(tcpMember);
509 }
510 }
511 }
512 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800513 break;
514 default:
515 break;
516 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800517 if (copycat != null) {
518 log.debug("Current cluster: {}", copycat.cluster());
519 }
520 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800521 }
522
523 }
524
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800525 @Override
526 public void removeMember(final ControllerNode node) {
527 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
528 node.tcpPort());
529 log.info("{} was removed from the cluster", tcpMember);
530 synchronized (clusterConfig) {
531 clusterConfig.removeRemoteMember(tcpMember);
532 }
533 }
534
535 @Override
536 public Collection<ControllerNode> listMembers() {
537 if (copycat == null) {
538 return ImmutableList.of();
539 }
540 Set<ControllerNode> members = new HashSet<>();
541 for (Member member : copycat.cluster().members()) {
542 if (member instanceof TcpMember) {
543 final TcpMember tcpMember = (TcpMember) member;
544 // TODO assuming tcpMember#host to be IP address,
545 // but if not lookup DNS, etc. first
546 IpAddress ip = IpAddress.valueOf(tcpMember.host());
547 int tcpPort = tcpMember.port();
548 NodeId id = getNodeIdFromIp(ip, tcpPort);
549 if (id == null) {
550 log.info("No NodeId found for {}:{}", ip, tcpPort);
551 continue;
552 }
553 members.add(new DefaultControllerNode(id, ip, tcpPort));
554 }
555 }
556 return members;
557 }
558
559 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
560 for (ControllerNode node : clusterService.getNodes()) {
561 if (node.ip().equals(ip) &&
562 node.tcpPort() == tcpPort) {
563 return node.id();
564 }
565 }
566 return null;
567 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800568}