blob: af3f6ac7824191584ea294b5dbbeb92ad015e6ec [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08005import java.io.File;
6import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.util.Collection;
8import java.util.Collections;
9import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080010import java.util.Map;
11import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080012import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080013import java.util.concurrent.ExecutionException;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080014import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080015
16import net.kuujo.copycat.Copycat;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080017import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080018import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080019import net.kuujo.copycat.cluster.TcpCluster;
20import net.kuujo.copycat.cluster.TcpClusterConfig;
21import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080022import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080023import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080024import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080025
Madan Jampani08822c42014-11-04 17:17:46 -080026import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080027import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080028import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080031import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080032import org.onlab.onos.cluster.ClusterEvent;
33import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080034import org.onlab.onos.cluster.ClusterService;
35import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080036import org.onlab.onos.cluster.DefaultControllerNode;
37import org.onlab.onos.cluster.NodeId;
Madan Jampanidef2c652014-11-12 13:50:10 -080038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani5ce30252014-11-17 20:53:17 -080039import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani12390c12014-11-12 00:35:56 -080041import org.onlab.onos.store.service.BatchReadRequest;
42import org.onlab.onos.store.service.BatchReadResult;
43import org.onlab.onos.store.service.BatchWriteRequest;
44import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080045import org.onlab.onos.store.service.DatabaseAdminService;
46import org.onlab.onos.store.service.DatabaseException;
47import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080048import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080049import org.onlab.onos.store.service.ReadStatus;
50import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080051import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080052import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080053import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080054import org.slf4j.Logger;
55
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080056import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080057
Madan Jampani08822c42014-11-04 17:17:46 -080058/**
59 * Strongly consistent and durable state management service based on
60 * Copycat implementation of Raft consensus protocol.
61 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080062@Component(immediate = true)
63@Service
Madan Jampani08822c42014-11-04 17:17:46 -080064public class DatabaseManager implements DatabaseService, DatabaseAdminService {
65
66 private final Logger log = getLogger(getClass());
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080069 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080070
Madan Jampani9b19a822014-11-04 21:37:13 -080071 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080072 protected ClusterCommunicationService clusterCommunicator;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080075 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080076
Yuta HIGUCHI13a6f5a2014-11-12 10:07:47 -080077 // FIXME: point to appropriate path
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080078 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
79
80 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080081 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080082 private static final String CONFIG_DIR = "../config";
83
84 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
85
86 private static final String DEFAULT_TABLET = "default";
87
88 // TODO: make this configurable
89 // initial member configuration file path
90 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080091
Madan Jampani5ce30252014-11-17 20:53:17 -080092 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
93 new MessageSubject("raft-leader-election-event");
94
Madan Jampani08822c42014-11-04 17:17:46 -080095 private Copycat copycat;
96 private DatabaseClient client;
97
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080098 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080099 private ClusterConfig<TcpMember> clusterConfig;
100
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800101 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800102 private ClusterEventListener clusterEventListener;
103
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800104 private Map<String, Set<DefaultControllerNode>> tabletMembers;
105
106 private boolean autoAddMember = false;
107
Madan Jampani08822c42014-11-04 17:17:46 -0800108 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800109 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800110
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800111 // load tablet configuration
112 File file = new File(CONFIG_DIR, initialMemberConfig);
113 log.info("Loading config: {}", file.getAbsolutePath());
114 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
115 try {
116 tabletMembers = tabletDef.read();
117 } catch (IOException e) {
118 log.error("Failed to load tablet config {}", file);
119 throw new IllegalStateException("Failed to load tablet config", e);
120 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800121
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800122 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800123 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800124 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
125 if (defaultMembers == null || defaultMembers.isEmpty()) {
126 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800127 DEFAULT_TABLET);
128 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800129
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800130 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800131
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800132 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800133 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800134 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
135 member.tcpPort());
136 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800137 clusterConfig.setLocalMember(tcpMember);
138 } else {
139 clusterConfig.addRemoteMember(tcpMember);
140 }
141 }
142
Madan Jampani5ce30252014-11-17 20:53:17 -0800143 if (clusterConfig.getLocalMember() != null) {
144
145 // Wait for a minimum viable Raft cluster to boot up.
146 waitForClusterQuorum();
147
148 final TcpCluster cluster;
149 synchronized (clusterConfig) {
150 // Create the cluster.
151 cluster = new TcpCluster(clusterConfig);
152 }
153 log.info("Starting cluster: {}", cluster);
154
155 DatabaseEntryExpirationTracker expirationTracker =
156 new DatabaseEntryExpirationTracker(
157 clusterConfig.getLocalMember(),
158 clusterService.getLocalNode(),
159 clusterCommunicator,
160 this);
161
162 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
163 stateMachine.addEventListener(expirationTracker);
164 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
165 ClusterMessagingProtocol.SERIALIZER);
166
167 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
168 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
169 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
170 }
171
172 client = new DatabaseClient(copycatMessagingProtocol);
173 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
174
175 // Starts copycat if this node is a participant
176 // of the Raft cluster.
177 if (copycat != null) {
178 copycat.start().get();
179 }
180
181 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800182
183 // Try and list the tables to verify database manager is
184 // in a state where it can serve requests.
185 tryTableListing();
186
Madan Jampani5ce30252014-11-17 20:53:17 -0800187 log.info("Started.");
188 }
189
190 @Deactivate
191 public void deactivate() {
192 clusterService.removeListener(clusterEventListener);
193 // TODO: ClusterCommunicationService must support more than one
194 // handler per message subject.
195 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
196 if (copycat != null) {
197 copycat.stop();
198 }
199 log.info("Stopped.");
200 }
201
202 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800203 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800204 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800205 clusterEventListener = new InternalClusterEventListener();
206 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800207
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800208 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
209 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800210 try {
211 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800212 log.info("Starting with {}/{} nodes cluster",
213 clusterService.getNodes().size(),
214 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800215 }
216 } catch (InterruptedException e) {
217 log.info("Interrupted waiting for others", e);
218 }
219 }
Madan Jampani08822c42014-11-04 17:17:46 -0800220 }
221
Madan Jampani71582ed2014-11-18 10:06:01 -0800222 private void tryTableListing() {
223 int retries = 0;
224 do {
225 try {
226 listTables();
227 return;
228 } catch (DatabaseException e) {
229 if (retries == 10) {
230 log.error("Failed to listTables after multiple attempts. Giving up.", e);
231 throw e;
232 } else {
233 log.debug("Failed to listTables. Will retry...", e);
234 retries++;
235 }
236 }
237 } while (true);
238 }
239
Madan Jampani08822c42014-11-04 17:17:46 -0800240 @Override
241 public boolean createTable(String name) {
242 return client.createTable(name);
243 }
244
245 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800246 public boolean createTable(String name, int ttlMillis) {
247 return client.createTable(name, ttlMillis);
248 }
249
250 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800251 public void dropTable(String name) {
252 client.dropTable(name);
253 }
254
255 @Override
256 public void dropAllTables() {
257 client.dropAllTables();
258 }
259
260 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800261 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800262 return client.listTables();
263 }
264
265 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800266 public VersionedValue get(String tableName, String key) {
267 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
268 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
269 if (readResult.status().equals(ReadStatus.OK)) {
270 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800271 }
Madan Jampani12390c12014-11-12 00:35:56 -0800272 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800273 }
274
275 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800276 public Map<String, VersionedValue> getAll(String tableName) {
277 return client.getAll(tableName);
278 }
279
280
281 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800282 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
283 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800284 }
285
286 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800287 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
288 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800289 }
290
291 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800292 public VersionedValue put(String tableName, String key, byte[] value) {
293 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
294 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
295 if (writeResult.status().equals(WriteStatus.OK)) {
296 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800297 }
Madan Jampani12390c12014-11-12 00:35:56 -0800298 throw new DatabaseException("put failed due to status: " + writeResult.status());
299 }
Madan Jampani08822c42014-11-04 17:17:46 -0800300
Madan Jampani12390c12014-11-12 00:35:56 -0800301 @Override
302 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800303 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
304 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800305 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
306 if (writeResult.status().equals(WriteStatus.OK)) {
307 return true;
308 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
309 return false;
310 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800311 throw new DatabaseException("putIfAbsent failed due to status: "
312 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800313 }
314
315 @Override
316 public boolean putIfVersionMatches(String tableName, String key,
317 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800318 BatchWriteRequest batchRequest =
319 new BatchWriteRequest.Builder()
320 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800321 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
322 if (writeResult.status().equals(WriteStatus.OK)) {
323 return true;
324 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
325 return false;
326 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800327 throw new DatabaseException("putIfVersionMatches failed due to status: "
328 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800329 }
330
331 @Override
332 public boolean putIfValueMatches(String tableName, String key,
333 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800334 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
335 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800336 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
337 if (writeResult.status().equals(WriteStatus.OK)) {
338 return true;
339 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
340 return false;
341 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800342 throw new DatabaseException("putIfValueMatches failed due to status: "
343 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800344 }
345
346 @Override
347 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800348 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
349 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800350 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
351 if (writeResult.status().equals(WriteStatus.OK)) {
352 return writeResult.previousValue();
353 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800354 throw new DatabaseException("remove failed due to status: "
355 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800356 }
357
358 @Override
359 public boolean removeIfVersionMatches(String tableName, String key,
360 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800361 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
362 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800363 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
364 if (writeResult.status().equals(WriteStatus.OK)) {
365 return true;
366 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
367 return false;
368 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800369 throw new DatabaseException("removeIfVersionMatches failed due to status: "
370 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800371 }
372
373 @Override
374 public boolean removeIfValueMatches(String tableName, String key,
375 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800376 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
377 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800378 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
379 if (writeResult.status().equals(WriteStatus.OK)) {
380 return true;
381 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
382 return false;
383 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800384 throw new DatabaseException("removeIfValueMatches failed due to status: "
385 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800386 }
387
388 @Override
389 public void addMember(final ControllerNode node) {
390 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
391 node.tcpPort());
392 log.info("{} was added to the cluster", tcpMember);
393 synchronized (clusterConfig) {
394 clusterConfig.addRemoteMember(tcpMember);
395 }
Madan Jampani08822c42014-11-04 17:17:46 -0800396 }
397
Madan Jampani5ce30252014-11-17 20:53:17 -0800398 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
399 @Override
400 public void handle(LeaderElectEvent event) {
401 try {
402 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
403 // This node just became the leader.
404 clusterCommunicator.broadcastIncludeSelf(
405 new ClusterMessage(
406 clusterService.getLocalNode().id(),
407 RAFT_LEADER_ELECTION_EVENT,
408 ClusterMessagingProtocol.SERIALIZER.encode(event)));
409 }
410 } catch (IOException e) {
411 log.error("Failed to broadcast raft leadership change event", e);
412 }
413 }
414 }
415
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800416 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800417 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800418
419 @Override
420 public void event(ClusterEvent event) {
421 // TODO: Not every node should be part of the consensus ring.
422
423 final ControllerNode node = event.subject();
424 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800425 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800426
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800427 switch (event.type()) {
428 case INSTANCE_ACTIVATED:
429 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800430 if (autoAddMember) {
431 synchronized (clusterConfig) {
432 if (!clusterConfig.getMembers().contains(tcpMember)) {
433 log.info("{} was automatically added to the cluster", tcpMember);
434 clusterConfig.addRemoteMember(tcpMember);
435 }
436 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800437 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800438 break;
439 case INSTANCE_DEACTIVATED:
440 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800441 if (autoAddMember) {
442 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800443 = tabletMembers.getOrDefault(DEFAULT_TABLET,
444 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800445 // remove only if not the initial members
446 if (!members.contains(node)) {
447 synchronized (clusterConfig) {
448 if (clusterConfig.getMembers().contains(tcpMember)) {
449 log.info("{} was automatically removed from the cluster", tcpMember);
450 clusterConfig.removeRemoteMember(tcpMember);
451 }
452 }
453 }
454 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800455 break;
456 default:
457 break;
458 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800459 if (copycat != null) {
460 log.debug("Current cluster: {}", copycat.cluster());
461 }
462 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800463 }
464
465 }
466
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800467 @Override
468 public void removeMember(final ControllerNode node) {
469 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
470 node.tcpPort());
471 log.info("{} was removed from the cluster", tcpMember);
472 synchronized (clusterConfig) {
473 clusterConfig.removeRemoteMember(tcpMember);
474 }
475 }
476
477 @Override
478 public Collection<ControllerNode> listMembers() {
479 if (copycat == null) {
480 return ImmutableList.of();
481 }
482 Set<ControllerNode> members = new HashSet<>();
483 for (Member member : copycat.cluster().members()) {
484 if (member instanceof TcpMember) {
485 final TcpMember tcpMember = (TcpMember) member;
486 // TODO assuming tcpMember#host to be IP address,
487 // but if not lookup DNS, etc. first
488 IpAddress ip = IpAddress.valueOf(tcpMember.host());
489 int tcpPort = tcpMember.port();
490 NodeId id = getNodeIdFromIp(ip, tcpPort);
491 if (id == null) {
492 log.info("No NodeId found for {}:{}", ip, tcpPort);
493 continue;
494 }
495 members.add(new DefaultControllerNode(id, ip, tcpPort));
496 }
497 }
498 return members;
499 }
500
501 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
502 for (ControllerNode node : clusterService.getNodes()) {
503 if (node.ip().equals(ip) &&
504 node.tcpPort() == tcpPort) {
505 return node.id();
506 }
507 }
508 return null;
509 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800510}