blob: 6fcd6b97a457d86cc8a83c9d3e3c4a432fd44e8b [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08005import java.io.File;
6import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.util.Collection;
8import java.util.Collections;
9import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080010import java.util.Map;
11import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080012import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080013import java.util.concurrent.ExecutionException;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080014import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080015
16import net.kuujo.copycat.Copycat;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080017import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080018import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080019import net.kuujo.copycat.cluster.TcpCluster;
20import net.kuujo.copycat.cluster.TcpClusterConfig;
21import net.kuujo.copycat.cluster.TcpMember;
Madan Jampanif5d263b2014-11-13 10:04:40 -080022import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080023import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080024
Madan Jampani08822c42014-11-04 17:17:46 -080025import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080026import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080027import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080030import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080031import org.onlab.onos.cluster.ClusterEvent;
32import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080033import org.onlab.onos.cluster.ClusterService;
34import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080035import org.onlab.onos.cluster.DefaultControllerNode;
36import org.onlab.onos.cluster.NodeId;
Madan Jampanidef2c652014-11-12 13:50:10 -080037import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani12390c12014-11-12 00:35:56 -080038import org.onlab.onos.store.service.BatchReadRequest;
39import org.onlab.onos.store.service.BatchReadResult;
40import org.onlab.onos.store.service.BatchWriteRequest;
41import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080042import org.onlab.onos.store.service.DatabaseAdminService;
43import org.onlab.onos.store.service.DatabaseException;
44import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080045import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080046import org.onlab.onos.store.service.ReadStatus;
47import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080048import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080049import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080050import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080051import org.slf4j.Logger;
52
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080053import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080054
Madan Jampani08822c42014-11-04 17:17:46 -080055/**
56 * Strongly consistent and durable state management service based on
57 * Copycat implementation of Raft consensus protocol.
58 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080059@Component(immediate = true)
60@Service
Madan Jampani08822c42014-11-04 17:17:46 -080061public class DatabaseManager implements DatabaseService, DatabaseAdminService {
62
63 private final Logger log = getLogger(getClass());
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080066 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080067
Madan Jampani9b19a822014-11-04 21:37:13 -080068 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080069 protected ClusterCommunicationService clusterCommunicator;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080072 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080073
Yuta HIGUCHI13a6f5a2014-11-12 10:07:47 -080074 // FIXME: point to appropriate path
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080075 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
76
77 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080078 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080079 private static final String CONFIG_DIR = "../config";
80
81 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
82
83 private static final String DEFAULT_TABLET = "default";
84
85 // TODO: make this configurable
86 // initial member configuration file path
87 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080088
89 private Copycat copycat;
90 private DatabaseClient client;
91
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080092 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080093 private ClusterConfig<TcpMember> clusterConfig;
94
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080095 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080096 private ClusterEventListener clusterEventListener;
97
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080098 private Map<String, Set<DefaultControllerNode>> tabletMembers;
99
100 private boolean autoAddMember = false;
101
Madan Jampani08822c42014-11-04 17:17:46 -0800102 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800103 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800104
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800105 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -0800106
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800107 // load tablet configuration
108 File file = new File(CONFIG_DIR, initialMemberConfig);
109 log.info("Loading config: {}", file.getAbsolutePath());
110 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
111 try {
112 tabletMembers = tabletDef.read();
113 } catch (IOException e) {
114 log.error("Failed to load tablet config {}", file);
115 throw new IllegalStateException("Failed to load tablet config", e);
116 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800117
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800118 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800119 clusterConfig = new TcpClusterConfig();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800120 Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
121 if (defaultMember == null || defaultMember.isEmpty()) {
122 log.error("No member found in [{}] tablet configuration.",
123 DEFAULT_TABLET);
124 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800125
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800126 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800127
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800128 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800129 for (ControllerNode member : defaultMember) {
130 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
131 member.tcpPort());
132 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800133 clusterConfig.setLocalMember(tcpMember);
134 } else {
135 clusterConfig.addRemoteMember(tcpMember);
136 }
137 }
138
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800139 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800140 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800141 clusterEventListener = new InternalClusterEventListener();
142 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800143
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800144 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
145 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800146 try {
147 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800148 log.info("Starting with {}/{} nodes cluster",
149 clusterService.getNodes().size(),
150 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800151 }
152 } catch (InterruptedException e) {
153 log.info("Interrupted waiting for others", e);
154 }
155 }
Madan Jampani08822c42014-11-04 17:17:46 -0800156
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800157 final TcpCluster cluster;
158 synchronized (clusterConfig) {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800159 // Create the cluster.
160 cluster = new TcpCluster(clusterConfig);
161 }
162 log.info("Starting cluster: {}", cluster);
163
Madan Jampanif5d263b2014-11-13 10:04:40 -0800164 DatabaseEntryExpirationTracker expirationTracker =
165 new DatabaseEntryExpirationTracker(
166 clusterConfig.getLocalMember(),
167 clusterService.getLocalNode(),
168 clusterCommunicator,
169 this);
Madan Jampani08822c42014-11-04 17:17:46 -0800170
Madan Jampanidef2c652014-11-12 13:50:10 -0800171 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800172 stateMachine.addEventListener(expirationTracker);
Madan Jampani2ee20002014-11-06 20:06:12 -0800173 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800174 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800175
Yuta HIGUCHI2fe63342014-11-17 21:33:38 -0800176 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
177
Madan Jampania88d1f52014-11-14 16:45:24 -0800178 client = new DatabaseClient(copycatMessagingProtocol);
179
Madan Jampanif5d263b2014-11-13 10:04:40 -0800180
Madan Jampania88d1f52014-11-14 16:45:24 -0800181 copycat.event(LeaderElectEvent.class).registerHandler(client);
Madan Jampanif5d263b2014-11-13 10:04:40 -0800182 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
183
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800184 copycat.start().get();
Madan Jampani08822c42014-11-04 17:17:46 -0800185
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800186 client.waitForLeader();
Madan Jampani08822c42014-11-04 17:17:46 -0800187
188 log.info("Started.");
189 }
190
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800191 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800192 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800193 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800194 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800195 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800196 }
197
198 @Override
199 public boolean createTable(String name) {
200 return client.createTable(name);
201 }
202
203 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800204 public boolean createTable(String name, int ttlMillis) {
205 return client.createTable(name, ttlMillis);
206 }
207
208 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800209 public void dropTable(String name) {
210 client.dropTable(name);
211 }
212
213 @Override
214 public void dropAllTables() {
215 client.dropAllTables();
216 }
217
218 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800219 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800220 return client.listTables();
221 }
222
223 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800224 public VersionedValue get(String tableName, String key) {
225 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
226 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
227 if (readResult.status().equals(ReadStatus.OK)) {
228 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800229 }
Madan Jampani12390c12014-11-12 00:35:56 -0800230 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800231 }
232
233 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800234 public Map<String, VersionedValue> getAll(String tableName) {
235 return client.getAll(tableName);
236 }
237
238
239 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800240 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
241 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800242 }
243
244 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800245 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
246 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800247 }
248
249 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800250 public VersionedValue put(String tableName, String key, byte[] value) {
251 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
252 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
253 if (writeResult.status().equals(WriteStatus.OK)) {
254 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800255 }
Madan Jampani12390c12014-11-12 00:35:56 -0800256 throw new DatabaseException("put failed due to status: " + writeResult.status());
257 }
Madan Jampani08822c42014-11-04 17:17:46 -0800258
Madan Jampani12390c12014-11-12 00:35:56 -0800259 @Override
260 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800261 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
262 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800263 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
264 if (writeResult.status().equals(WriteStatus.OK)) {
265 return true;
266 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
267 return false;
268 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800269 throw new DatabaseException("putIfAbsent failed due to status: "
270 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800271 }
272
273 @Override
274 public boolean putIfVersionMatches(String tableName, String key,
275 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800276 BatchWriteRequest batchRequest =
277 new BatchWriteRequest.Builder()
278 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800279 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
280 if (writeResult.status().equals(WriteStatus.OK)) {
281 return true;
282 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
283 return false;
284 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800285 throw new DatabaseException("putIfVersionMatches failed due to status: "
286 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800287 }
288
289 @Override
290 public boolean putIfValueMatches(String tableName, String key,
291 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800292 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
293 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800294 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
295 if (writeResult.status().equals(WriteStatus.OK)) {
296 return true;
297 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
298 return false;
299 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800300 throw new DatabaseException("putIfValueMatches failed due to status: "
301 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800302 }
303
304 @Override
305 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800306 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
307 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800308 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
309 if (writeResult.status().equals(WriteStatus.OK)) {
310 return writeResult.previousValue();
311 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800312 throw new DatabaseException("remove failed due to status: "
313 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800314 }
315
316 @Override
317 public boolean removeIfVersionMatches(String tableName, String key,
318 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800319 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
320 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800321 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
322 if (writeResult.status().equals(WriteStatus.OK)) {
323 return true;
324 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
325 return false;
326 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800327 throw new DatabaseException("removeIfVersionMatches failed due to status: "
328 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800329 }
330
331 @Override
332 public boolean removeIfValueMatches(String tableName, String key,
333 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800334 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
335 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800336 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
337 if (writeResult.status().equals(WriteStatus.OK)) {
338 return true;
339 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
340 return false;
341 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800342 throw new DatabaseException("removeIfValueMatches failed due to status: "
343 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800344 }
345
346 @Override
347 public void addMember(final ControllerNode node) {
348 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
349 node.tcpPort());
350 log.info("{} was added to the cluster", tcpMember);
351 synchronized (clusterConfig) {
352 clusterConfig.addRemoteMember(tcpMember);
353 }
Madan Jampani08822c42014-11-04 17:17:46 -0800354 }
355
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800356 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800357 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800358
359 @Override
360 public void event(ClusterEvent event) {
361 // TODO: Not every node should be part of the consensus ring.
362
363 final ControllerNode node = event.subject();
364 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800365 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800366
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800367 switch (event.type()) {
368 case INSTANCE_ACTIVATED:
369 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800370 if (autoAddMember) {
371 synchronized (clusterConfig) {
372 if (!clusterConfig.getMembers().contains(tcpMember)) {
373 log.info("{} was automatically added to the cluster", tcpMember);
374 clusterConfig.addRemoteMember(tcpMember);
375 }
376 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800377 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800378 break;
379 case INSTANCE_DEACTIVATED:
380 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800381 if (autoAddMember) {
382 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800383 = tabletMembers.getOrDefault(DEFAULT_TABLET,
384 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800385 // remove only if not the initial members
386 if (!members.contains(node)) {
387 synchronized (clusterConfig) {
388 if (clusterConfig.getMembers().contains(tcpMember)) {
389 log.info("{} was automatically removed from the cluster", tcpMember);
390 clusterConfig.removeRemoteMember(tcpMember);
391 }
392 }
393 }
394 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800395 break;
396 default:
397 break;
398 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800399 if (copycat != null) {
400 log.debug("Current cluster: {}", copycat.cluster());
401 }
402 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800403 }
404
405 }
406
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800407 @Override
408 public void removeMember(final ControllerNode node) {
409 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
410 node.tcpPort());
411 log.info("{} was removed from the cluster", tcpMember);
412 synchronized (clusterConfig) {
413 clusterConfig.removeRemoteMember(tcpMember);
414 }
415 }
416
417 @Override
418 public Collection<ControllerNode> listMembers() {
419 if (copycat == null) {
420 return ImmutableList.of();
421 }
422 Set<ControllerNode> members = new HashSet<>();
423 for (Member member : copycat.cluster().members()) {
424 if (member instanceof TcpMember) {
425 final TcpMember tcpMember = (TcpMember) member;
426 // TODO assuming tcpMember#host to be IP address,
427 // but if not lookup DNS, etc. first
428 IpAddress ip = IpAddress.valueOf(tcpMember.host());
429 int tcpPort = tcpMember.port();
430 NodeId id = getNodeIdFromIp(ip, tcpPort);
431 if (id == null) {
432 log.info("No NodeId found for {}:{}", ip, tcpPort);
433 continue;
434 }
435 members.add(new DefaultControllerNode(id, ip, tcpPort));
436 }
437 }
438 return members;
439 }
440
441 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
442 for (ControllerNode node : clusterService.getNodes()) {
443 if (node.ip().equals(ip) &&
444 node.tcpPort() == tcpPort) {
445 return node.id();
446 }
447 }
448 return null;
449 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800450}