blob: d84df2288534881935ad4fcf789432b95dd01551 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08005import java.io.File;
6import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.util.Collection;
8import java.util.Collections;
9import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080010import java.util.Map;
11import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080012import java.util.concurrent.CountDownLatch;
13import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080014
15import net.kuujo.copycat.Copycat;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080016import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080017import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080018import net.kuujo.copycat.cluster.TcpCluster;
19import net.kuujo.copycat.cluster.TcpClusterConfig;
20import net.kuujo.copycat.cluster.TcpMember;
Madan Jampanif5d263b2014-11-13 10:04:40 -080021import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080022import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080023
Madan Jampani08822c42014-11-04 17:17:46 -080024import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080025import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080026import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080027import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080029import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080030import org.onlab.onos.cluster.ClusterEvent;
31import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080032import org.onlab.onos.cluster.ClusterService;
33import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080034import org.onlab.onos.cluster.DefaultControllerNode;
35import org.onlab.onos.cluster.NodeId;
Madan Jampanidef2c652014-11-12 13:50:10 -080036import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani12390c12014-11-12 00:35:56 -080037import org.onlab.onos.store.service.BatchReadRequest;
38import org.onlab.onos.store.service.BatchReadResult;
39import org.onlab.onos.store.service.BatchWriteRequest;
40import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080041import org.onlab.onos.store.service.DatabaseAdminService;
42import org.onlab.onos.store.service.DatabaseException;
43import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080044import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080045import org.onlab.onos.store.service.ReadStatus;
46import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080047import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080048import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080049import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080050import org.slf4j.Logger;
51
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080052import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080053
Madan Jampani08822c42014-11-04 17:17:46 -080054/**
55 * Strongly consistent and durable state management service based on
56 * Copycat implementation of Raft consensus protocol.
57 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080058@Component(immediate = true)
59@Service
Madan Jampani08822c42014-11-04 17:17:46 -080060public class DatabaseManager implements DatabaseService, DatabaseAdminService {
61
62 private final Logger log = getLogger(getClass());
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080065 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080066
Madan Jampani9b19a822014-11-04 21:37:13 -080067 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080068 protected ClusterCommunicationService clusterCommunicator;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080071 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080072
Yuta HIGUCHI13a6f5a2014-11-12 10:07:47 -080073 // FIXME: point to appropriate path
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080074 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
75
76 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080077 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080078 private static final String CONFIG_DIR = "../config";
79
80 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
81
82 private static final String DEFAULT_TABLET = "default";
83
84 // TODO: make this configurable
85 // initial member configuration file path
86 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080087
88 private Copycat copycat;
89 private DatabaseClient client;
90
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080091 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080092 private ClusterConfig<TcpMember> clusterConfig;
93
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080094 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080095 private ClusterEventListener clusterEventListener;
96
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080097 private Map<String, Set<DefaultControllerNode>> tabletMembers;
98
99 private boolean autoAddMember = false;
100
Madan Jampani08822c42014-11-04 17:17:46 -0800101 @Activate
102 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800103
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800104 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -0800105
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800106 // load tablet configuration
107 File file = new File(CONFIG_DIR, initialMemberConfig);
108 log.info("Loading config: {}", file.getAbsolutePath());
109 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
110 try {
111 tabletMembers = tabletDef.read();
112 } catch (IOException e) {
113 log.error("Failed to load tablet config {}", file);
114 throw new IllegalStateException("Failed to load tablet config", e);
115 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800116
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800117 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800118 clusterConfig = new TcpClusterConfig();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800119 Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
120 if (defaultMember == null || defaultMember.isEmpty()) {
121 log.error("No member found in [{}] tablet configuration.",
122 DEFAULT_TABLET);
123 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800124
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800125 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800126
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800127 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800128 for (ControllerNode member : defaultMember) {
129 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
130 member.tcpPort());
131 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800132 clusterConfig.setLocalMember(tcpMember);
133 } else {
134 clusterConfig.addRemoteMember(tcpMember);
135 }
136 }
137
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800138 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800139 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800140 clusterEventListener = new InternalClusterEventListener();
141 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800142
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800143 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
144 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800145 try {
146 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800147 log.info("Starting with {}/{} nodes cluster",
148 clusterService.getNodes().size(),
149 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800150 }
151 } catch (InterruptedException e) {
152 log.info("Interrupted waiting for others", e);
153 }
154 }
Madan Jampani08822c42014-11-04 17:17:46 -0800155
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800156 final TcpCluster cluster;
157 synchronized (clusterConfig) {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800158 // Create the cluster.
159 cluster = new TcpCluster(clusterConfig);
160 }
161 log.info("Starting cluster: {}", cluster);
162
Madan Jampanif5d263b2014-11-13 10:04:40 -0800163 DatabaseEntryExpirationTracker expirationTracker =
164 new DatabaseEntryExpirationTracker(
165 clusterConfig.getLocalMember(),
166 clusterService.getLocalNode(),
167 clusterCommunicator,
168 this);
Madan Jampani08822c42014-11-04 17:17:46 -0800169
Madan Jampanidef2c652014-11-12 13:50:10 -0800170 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800171 stateMachine.addEventListener(expirationTracker);
Madan Jampani2ee20002014-11-06 20:06:12 -0800172 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800173 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800174
Madan Jampani9b19a822014-11-04 21:37:13 -0800175 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampanif5d263b2014-11-13 10:04:40 -0800176
177 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
178
Madan Jampani08822c42014-11-04 17:17:46 -0800179 copycat.start();
180
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800181 client = new DatabaseClient(copycat);
Madan Jampani08822c42014-11-04 17:17:46 -0800182
183 log.info("Started.");
184 }
185
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800186 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800187 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800188 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800189 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800190 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800191 }
192
193 @Override
194 public boolean createTable(String name) {
195 return client.createTable(name);
196 }
197
198 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800199 public boolean createTable(String name, int ttlMillis) {
200 return client.createTable(name, ttlMillis);
201 }
202
203 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800204 public void dropTable(String name) {
205 client.dropTable(name);
206 }
207
208 @Override
209 public void dropAllTables() {
210 client.dropAllTables();
211 }
212
213 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800214 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800215 return client.listTables();
216 }
217
218 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800219 public VersionedValue get(String tableName, String key) {
220 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
221 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
222 if (readResult.status().equals(ReadStatus.OK)) {
223 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800224 }
Madan Jampani12390c12014-11-12 00:35:56 -0800225 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800226 }
227
228 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800229 public Map<String, VersionedValue> getAll(String tableName) {
230 return client.getAll(tableName);
231 }
232
233
234 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800235 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
236 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800237 }
238
239 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800240 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
241 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800242 }
243
244 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800245 public VersionedValue put(String tableName, String key, byte[] value) {
246 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
247 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
248 if (writeResult.status().equals(WriteStatus.OK)) {
249 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800250 }
Madan Jampani12390c12014-11-12 00:35:56 -0800251 throw new DatabaseException("put failed due to status: " + writeResult.status());
252 }
Madan Jampani08822c42014-11-04 17:17:46 -0800253
Madan Jampani12390c12014-11-12 00:35:56 -0800254 @Override
255 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800256 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
257 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800258 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
259 if (writeResult.status().equals(WriteStatus.OK)) {
260 return true;
261 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
262 return false;
263 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800264 throw new DatabaseException("putIfAbsent failed due to status: "
265 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800266 }
267
268 @Override
269 public boolean putIfVersionMatches(String tableName, String key,
270 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800271 BatchWriteRequest batchRequest =
272 new BatchWriteRequest.Builder()
273 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800274 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
275 if (writeResult.status().equals(WriteStatus.OK)) {
276 return true;
277 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
278 return false;
279 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800280 throw new DatabaseException("putIfVersionMatches failed due to status: "
281 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800282 }
283
284 @Override
285 public boolean putIfValueMatches(String tableName, String key,
286 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800287 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
288 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800289 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
290 if (writeResult.status().equals(WriteStatus.OK)) {
291 return true;
292 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
293 return false;
294 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800295 throw new DatabaseException("putIfValueMatches failed due to status: "
296 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800297 }
298
299 @Override
300 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800301 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
302 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800303 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
304 if (writeResult.status().equals(WriteStatus.OK)) {
305 return writeResult.previousValue();
306 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800307 throw new DatabaseException("remove failed due to status: "
308 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800309 }
310
311 @Override
312 public boolean removeIfVersionMatches(String tableName, String key,
313 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800314 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
315 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800316 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
317 if (writeResult.status().equals(WriteStatus.OK)) {
318 return true;
319 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
320 return false;
321 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800322 throw new DatabaseException("removeIfVersionMatches failed due to status: "
323 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800324 }
325
326 @Override
327 public boolean removeIfValueMatches(String tableName, String key,
328 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800329 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
330 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800331 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
332 if (writeResult.status().equals(WriteStatus.OK)) {
333 return true;
334 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
335 return false;
336 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800337 throw new DatabaseException("removeIfValueMatches failed due to status: "
338 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800339 }
340
341 @Override
342 public void addMember(final ControllerNode node) {
343 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
344 node.tcpPort());
345 log.info("{} was added to the cluster", tcpMember);
346 synchronized (clusterConfig) {
347 clusterConfig.addRemoteMember(tcpMember);
348 }
Madan Jampani08822c42014-11-04 17:17:46 -0800349 }
350
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800351 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800352 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800353
354 @Override
355 public void event(ClusterEvent event) {
356 // TODO: Not every node should be part of the consensus ring.
357
358 final ControllerNode node = event.subject();
359 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800360 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800361
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800362 switch (event.type()) {
363 case INSTANCE_ACTIVATED:
364 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800365 if (autoAddMember) {
366 synchronized (clusterConfig) {
367 if (!clusterConfig.getMembers().contains(tcpMember)) {
368 log.info("{} was automatically added to the cluster", tcpMember);
369 clusterConfig.addRemoteMember(tcpMember);
370 }
371 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800372 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800373 break;
374 case INSTANCE_DEACTIVATED:
375 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800376 if (autoAddMember) {
377 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800378 = tabletMembers.getOrDefault(DEFAULT_TABLET,
379 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800380 // remove only if not the initial members
381 if (!members.contains(node)) {
382 synchronized (clusterConfig) {
383 if (clusterConfig.getMembers().contains(tcpMember)) {
384 log.info("{} was automatically removed from the cluster", tcpMember);
385 clusterConfig.removeRemoteMember(tcpMember);
386 }
387 }
388 }
389 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800390 break;
391 default:
392 break;
393 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800394 if (copycat != null) {
395 log.debug("Current cluster: {}", copycat.cluster());
396 }
397 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800398 }
399
400 }
401
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800402 @Override
403 public void removeMember(final ControllerNode node) {
404 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
405 node.tcpPort());
406 log.info("{} was removed from the cluster", tcpMember);
407 synchronized (clusterConfig) {
408 clusterConfig.removeRemoteMember(tcpMember);
409 }
410 }
411
412 @Override
413 public Collection<ControllerNode> listMembers() {
414 if (copycat == null) {
415 return ImmutableList.of();
416 }
417 Set<ControllerNode> members = new HashSet<>();
418 for (Member member : copycat.cluster().members()) {
419 if (member instanceof TcpMember) {
420 final TcpMember tcpMember = (TcpMember) member;
421 // TODO assuming tcpMember#host to be IP address,
422 // but if not lookup DNS, etc. first
423 IpAddress ip = IpAddress.valueOf(tcpMember.host());
424 int tcpPort = tcpMember.port();
425 NodeId id = getNodeIdFromIp(ip, tcpPort);
426 if (id == null) {
427 log.info("No NodeId found for {}:{}", ip, tcpPort);
428 continue;
429 }
430 members.add(new DefaultControllerNode(id, ip, tcpPort));
431 }
432 }
433 return members;
434 }
435
436 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
437 for (ControllerNode node : clusterService.getNodes()) {
438 if (node.ip().equals(ip) &&
439 node.tcpPort() == tcpPort) {
440 return node.id();
441 }
442 }
443 return null;
444 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800445}