blob: dd0b379b5369089fe4caa9959ccfb90a92f8b0e9 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -08003import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
4import static org.onlab.util.Tools.namedThreads;
Madan Jampani08822c42014-11-04 17:17:46 -08005import static org.slf4j.LoggerFactory.getLogger;
6
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08007import java.io.File;
8import java.io.IOException;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08009import java.util.Collection;
10import java.util.Collections;
11import java.util.HashSet;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080012import java.util.Map;
13import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080014import java.util.concurrent.CountDownLatch;
Yuta HIGUCHI39da9792014-11-14 02:07:04 -080015import java.util.concurrent.ExecutionException;
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -080016import java.util.concurrent.ScheduledExecutorService;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080017import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080018
19import net.kuujo.copycat.Copycat;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080020import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080021import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080022import net.kuujo.copycat.cluster.TcpCluster;
23import net.kuujo.copycat.cluster.TcpClusterConfig;
24import net.kuujo.copycat.cluster.TcpMember;
Madan Jampani5ce30252014-11-17 20:53:17 -080025import net.kuujo.copycat.event.EventHandler;
Madan Jampanif5d263b2014-11-13 10:04:40 -080026import net.kuujo.copycat.event.LeaderElectEvent;
Madan Jampani08822c42014-11-04 17:17:46 -080027import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080028
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080030import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080031import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080034import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080035import org.onlab.onos.cluster.ClusterEvent;
36import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080037import org.onlab.onos.cluster.ClusterService;
38import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080039import org.onlab.onos.cluster.DefaultControllerNode;
40import org.onlab.onos.cluster.NodeId;
Madan Jampanidef2c652014-11-12 13:50:10 -080041import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
Madan Jampani5ce30252014-11-17 20:53:17 -080042import org.onlab.onos.store.cluster.messaging.ClusterMessage;
43import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani12390c12014-11-12 00:35:56 -080044import org.onlab.onos.store.service.BatchReadRequest;
45import org.onlab.onos.store.service.BatchReadResult;
46import org.onlab.onos.store.service.BatchWriteRequest;
47import org.onlab.onos.store.service.BatchWriteResult;
Madan Jampani08822c42014-11-04 17:17:46 -080048import org.onlab.onos.store.service.DatabaseAdminService;
49import org.onlab.onos.store.service.DatabaseException;
50import org.onlab.onos.store.service.DatabaseService;
Madan Jampani08822c42014-11-04 17:17:46 -080051import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080052import org.onlab.onos.store.service.ReadStatus;
53import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080054import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080055import org.onlab.onos.store.service.WriteStatus;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080056import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080057import org.slf4j.Logger;
58
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080059import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080060
Madan Jampani08822c42014-11-04 17:17:46 -080061/**
62 * Strongly consistent and durable state management service based on
63 * Copycat implementation of Raft consensus protocol.
64 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080065@Component(immediate = true)
66@Service
Madan Jampani08822c42014-11-04 17:17:46 -080067public class DatabaseManager implements DatabaseService, DatabaseAdminService {
68
69 private final Logger log = getLogger(getClass());
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080072 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080073
Madan Jampani9b19a822014-11-04 21:37:13 -080074 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampanidef2c652014-11-12 13:50:10 -080075 protected ClusterCommunicationService clusterCommunicator;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080078 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080079
Yuta HIGUCHI13a6f5a2014-11-12 10:07:47 -080080 // FIXME: point to appropriate path
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080081 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
82
83 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
Pavlin Radoslavov190f8f92014-11-11 15:56:14 -080084 // TODO: Set the path to /opt/onos/config
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080085 private static final String CONFIG_DIR = "../config";
86
87 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
88
89 private static final String DEFAULT_TABLET = "default";
90
91 // TODO: make this configurable
92 // initial member configuration file path
93 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080094
Madan Jampani5ce30252014-11-17 20:53:17 -080095 public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
96 new MessageSubject("raft-leader-election-event");
97
Madan Jampani08822c42014-11-04 17:17:46 -080098 private Copycat copycat;
99 private DatabaseClient client;
100
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800101 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800102 private ClusterConfig<TcpMember> clusterConfig;
103
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800104 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800105 private ClusterEventListener clusterEventListener;
106
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800107 private Map<String, Set<DefaultControllerNode>> tabletMembers;
108
109 private boolean autoAddMember = false;
110
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800111 private ScheduledExecutorService executor;
112
113 private volatile LeaderElectEvent myLeaderEvent = null;
114
Madan Jampani08822c42014-11-04 17:17:46 -0800115 @Activate
Yuta HIGUCHI39da9792014-11-14 02:07:04 -0800116 public void activate() throws InterruptedException, ExecutionException {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800117
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800118 // load tablet configuration
119 File file = new File(CONFIG_DIR, initialMemberConfig);
120 log.info("Loading config: {}", file.getAbsolutePath());
121 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
122 try {
123 tabletMembers = tabletDef.read();
124 } catch (IOException e) {
125 log.error("Failed to load tablet config {}", file);
126 throw new IllegalStateException("Failed to load tablet config", e);
127 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800128
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800129 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800130 clusterConfig = new TcpClusterConfig();
Madan Jampani5ce30252014-11-17 20:53:17 -0800131 Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
132 if (defaultMembers == null || defaultMembers.isEmpty()) {
133 log.error("No members found in [{}] tablet configuration.",
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800134 DEFAULT_TABLET);
135 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800136
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800137 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800138
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800139 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani5ce30252014-11-17 20:53:17 -0800140 for (ControllerNode member : defaultMembers) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800141 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
142 member.tcpPort());
143 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800144 clusterConfig.setLocalMember(tcpMember);
145 } else {
146 clusterConfig.addRemoteMember(tcpMember);
147 }
148 }
149
Madan Jampani5ce30252014-11-17 20:53:17 -0800150 if (clusterConfig.getLocalMember() != null) {
151
152 // Wait for a minimum viable Raft cluster to boot up.
153 waitForClusterQuorum();
154
155 final TcpCluster cluster;
156 synchronized (clusterConfig) {
157 // Create the cluster.
158 cluster = new TcpCluster(clusterConfig);
159 }
160 log.info("Starting cluster: {}", cluster);
161
162 DatabaseEntryExpirationTracker expirationTracker =
163 new DatabaseEntryExpirationTracker(
164 clusterConfig.getLocalMember(),
165 clusterService.getLocalNode(),
166 clusterCommunicator,
167 this);
168
169 DatabaseStateMachine stateMachine = new DatabaseStateMachine();
170 stateMachine.addEventListener(expirationTracker);
171 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
172 ClusterMessagingProtocol.SERIALIZER);
173
174 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
175 copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
176 copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
177 }
178
179 client = new DatabaseClient(copycatMessagingProtocol);
180 clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
181
182 // Starts copycat if this node is a participant
183 // of the Raft cluster.
184 if (copycat != null) {
185 copycat.start().get();
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800186
187 executor =
188 newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
189 executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
190
Madan Jampani5ce30252014-11-17 20:53:17 -0800191 }
192
193 client.waitForLeader();
Madan Jampani71582ed2014-11-18 10:06:01 -0800194
195 // Try and list the tables to verify database manager is
196 // in a state where it can serve requests.
197 tryTableListing();
198
Madan Jampani5ce30252014-11-17 20:53:17 -0800199 log.info("Started.");
200 }
201
202 @Deactivate
203 public void deactivate() {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800204 if (executor != null) {
205 executor.shutdownNow();
206 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800207 clusterService.removeListener(clusterEventListener);
208 // TODO: ClusterCommunicationService must support more than one
209 // handler per message subject.
210 clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
211 if (copycat != null) {
212 copycat.stop();
213 }
214 log.info("Stopped.");
215 }
216
217 private void waitForClusterQuorum() {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800218 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800219 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800220 clusterEventListener = new InternalClusterEventListener();
221 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800222
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800223 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
224 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800225 try {
226 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800227 log.info("Starting with {}/{} nodes cluster",
228 clusterService.getNodes().size(),
229 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800230 }
231 } catch (InterruptedException e) {
232 log.info("Interrupted waiting for others", e);
233 }
234 }
Madan Jampani08822c42014-11-04 17:17:46 -0800235 }
236
Madan Jampani71582ed2014-11-18 10:06:01 -0800237 private void tryTableListing() {
238 int retries = 0;
239 do {
240 try {
241 listTables();
242 return;
243 } catch (DatabaseException e) {
244 if (retries == 10) {
245 log.error("Failed to listTables after multiple attempts. Giving up.", e);
246 throw e;
247 } else {
248 log.debug("Failed to listTables. Will retry...", e);
Madan Jampani71582ed2014-11-18 10:06:01 -0800249 }
250 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800251 retries++;
Madan Jampani71582ed2014-11-18 10:06:01 -0800252 } while (true);
253 }
254
Madan Jampani08822c42014-11-04 17:17:46 -0800255 @Override
256 public boolean createTable(String name) {
257 return client.createTable(name);
258 }
259
260 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800261 public boolean createTable(String name, int ttlMillis) {
262 return client.createTable(name, ttlMillis);
263 }
264
265 @Override
Madan Jampani08822c42014-11-04 17:17:46 -0800266 public void dropTable(String name) {
267 client.dropTable(name);
268 }
269
270 @Override
271 public void dropAllTables() {
272 client.dropAllTables();
273 }
274
275 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800276 public Set<String> listTables() {
Madan Jampani08822c42014-11-04 17:17:46 -0800277 return client.listTables();
278 }
279
280 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800281 public VersionedValue get(String tableName, String key) {
282 BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
283 ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
284 if (readResult.status().equals(ReadStatus.OK)) {
285 return readResult.value();
Madan Jampani08822c42014-11-04 17:17:46 -0800286 }
Madan Jampani12390c12014-11-12 00:35:56 -0800287 throw new DatabaseException("get failed due to status: " + readResult.status());
Madan Jampani08822c42014-11-04 17:17:46 -0800288 }
289
290 @Override
Yuta HIGUCHI841c0b62014-11-13 20:27:14 -0800291 public Map<String, VersionedValue> getAll(String tableName) {
292 return client.getAll(tableName);
293 }
294
295
296 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800297 public BatchReadResult batchRead(BatchReadRequest batchRequest) {
298 return new BatchReadResult(client.batchRead(batchRequest));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800299 }
300
301 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800302 public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
303 return new BatchWriteResult(client.batchWrite(batchRequest));
Madan Jampani08822c42014-11-04 17:17:46 -0800304 }
305
306 @Override
Madan Jampani12390c12014-11-12 00:35:56 -0800307 public VersionedValue put(String tableName, String key, byte[] value) {
308 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
309 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
310 if (writeResult.status().equals(WriteStatus.OK)) {
311 return writeResult.previousValue();
Madan Jampani08822c42014-11-04 17:17:46 -0800312 }
Madan Jampani12390c12014-11-12 00:35:56 -0800313 throw new DatabaseException("put failed due to status: " + writeResult.status());
314 }
Madan Jampani08822c42014-11-04 17:17:46 -0800315
Madan Jampani12390c12014-11-12 00:35:56 -0800316 @Override
317 public boolean putIfAbsent(String tableName, String key, byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800318 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
319 .putIfAbsent(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800320 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
321 if (writeResult.status().equals(WriteStatus.OK)) {
322 return true;
323 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
324 return false;
325 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800326 throw new DatabaseException("putIfAbsent failed due to status: "
327 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800328 }
329
330 @Override
331 public boolean putIfVersionMatches(String tableName, String key,
332 byte[] value, long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800333 BatchWriteRequest batchRequest =
334 new BatchWriteRequest.Builder()
335 .putIfVersionMatches(tableName, key, value, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800336 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
337 if (writeResult.status().equals(WriteStatus.OK)) {
338 return true;
339 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
340 return false;
341 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800342 throw new DatabaseException("putIfVersionMatches failed due to status: "
343 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800344 }
345
346 @Override
347 public boolean putIfValueMatches(String tableName, String key,
348 byte[] oldValue, byte[] newValue) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800349 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
350 .putIfValueMatches(tableName, key, oldValue, newValue).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800351 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
352 if (writeResult.status().equals(WriteStatus.OK)) {
353 return true;
354 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
355 return false;
356 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800357 throw new DatabaseException("putIfValueMatches failed due to status: "
358 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800359 }
360
361 @Override
362 public VersionedValue remove(String tableName, String key) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800363 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
364 .remove(tableName, key).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800365 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
366 if (writeResult.status().equals(WriteStatus.OK)) {
367 return writeResult.previousValue();
368 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800369 throw new DatabaseException("remove failed due to status: "
370 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800371 }
372
373 @Override
374 public boolean removeIfVersionMatches(String tableName, String key,
375 long version) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800376 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
377 .removeIfVersionMatches(tableName, key, version).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800378 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
379 if (writeResult.status().equals(WriteStatus.OK)) {
380 return true;
381 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
382 return false;
383 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800384 throw new DatabaseException("removeIfVersionMatches failed due to status: "
385 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800386 }
387
388 @Override
389 public boolean removeIfValueMatches(String tableName, String key,
390 byte[] value) {
Madan Jampani44e6a542014-11-12 01:06:51 -0800391 BatchWriteRequest batchRequest = new BatchWriteRequest.Builder()
392 .removeIfValueMatches(tableName, key, value).build();
Madan Jampani12390c12014-11-12 00:35:56 -0800393 WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
394 if (writeResult.status().equals(WriteStatus.OK)) {
395 return true;
396 } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
397 return false;
398 }
Madan Jampani44e6a542014-11-12 01:06:51 -0800399 throw new DatabaseException("removeIfValueMatches failed due to status: "
400 + writeResult.status());
Madan Jampani12390c12014-11-12 00:35:56 -0800401 }
402
403 @Override
404 public void addMember(final ControllerNode node) {
405 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
406 node.tcpPort());
407 log.info("{} was added to the cluster", tcpMember);
408 synchronized (clusterConfig) {
409 clusterConfig.addRemoteMember(tcpMember);
410 }
Madan Jampani08822c42014-11-04 17:17:46 -0800411 }
412
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800413 private final class LeaderAdvertiser implements Runnable {
414
Madan Jampani5ce30252014-11-17 20:53:17 -0800415 @Override
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800416 public void run() {
Madan Jampani5ce30252014-11-17 20:53:17 -0800417 try {
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800418 LeaderElectEvent event = myLeaderEvent;
419 if (event != null) {
420 log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
Madan Jampani5ce30252014-11-17 20:53:17 -0800421 // This node just became the leader.
422 clusterCommunicator.broadcastIncludeSelf(
423 new ClusterMessage(
424 clusterService.getLocalNode().id(),
425 RAFT_LEADER_ELECTION_EVENT,
426 ClusterMessagingProtocol.SERIALIZER.encode(event)));
427 }
Yuta HIGUCHIf8535f32014-11-20 11:11:01 -0800428 } catch (Exception e) {
429 log.debug("LeaderAdvertiser failed with exception", e);
430 }
431 }
432
433 }
434
435 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
436 @Override
437 public void handle(LeaderElectEvent event) {
438 try {
439 log.debug("Received LeaderElectEvent: {}", event);
440 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
441 log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
442 myLeaderEvent = event;
443 // This node just became the leader.
444 clusterCommunicator.broadcastIncludeSelf(
445 new ClusterMessage(
446 clusterService.getLocalNode().id(),
447 RAFT_LEADER_ELECTION_EVENT,
448 ClusterMessagingProtocol.SERIALIZER.encode(event)));
449 } else {
450 if (myLeaderEvent != null) {
451 log.debug("This node is no longer the Leader");
452 }
453 myLeaderEvent = null;
454 }
Madan Jampani5ce30252014-11-17 20:53:17 -0800455 } catch (IOException e) {
456 log.error("Failed to broadcast raft leadership change event", e);
457 }
458 }
459 }
460
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800461 private final class InternalClusterEventListener
Madan Jampani12390c12014-11-12 00:35:56 -0800462 implements ClusterEventListener {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800463
464 @Override
465 public void event(ClusterEvent event) {
466 // TODO: Not every node should be part of the consensus ring.
467
468 final ControllerNode node = event.subject();
469 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
Madan Jampani12390c12014-11-12 00:35:56 -0800470 node.tcpPort());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800471
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800472 switch (event.type()) {
473 case INSTANCE_ACTIVATED:
474 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800475 if (autoAddMember) {
476 synchronized (clusterConfig) {
477 if (!clusterConfig.getMembers().contains(tcpMember)) {
478 log.info("{} was automatically added to the cluster", tcpMember);
479 clusterConfig.addRemoteMember(tcpMember);
480 }
481 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800482 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800483 break;
484 case INSTANCE_DEACTIVATED:
485 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800486 if (autoAddMember) {
487 Set<DefaultControllerNode> members
Madan Jampani12390c12014-11-12 00:35:56 -0800488 = tabletMembers.getOrDefault(DEFAULT_TABLET,
489 Collections.emptySet());
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800490 // remove only if not the initial members
491 if (!members.contains(node)) {
492 synchronized (clusterConfig) {
493 if (clusterConfig.getMembers().contains(tcpMember)) {
494 log.info("{} was automatically removed from the cluster", tcpMember);
495 clusterConfig.removeRemoteMember(tcpMember);
496 }
497 }
498 }
499 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800500 break;
501 default:
502 break;
503 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800504 if (copycat != null) {
505 log.debug("Current cluster: {}", copycat.cluster());
506 }
507 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800508 }
509
510 }
511
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800512 @Override
513 public void removeMember(final ControllerNode node) {
514 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
515 node.tcpPort());
516 log.info("{} was removed from the cluster", tcpMember);
517 synchronized (clusterConfig) {
518 clusterConfig.removeRemoteMember(tcpMember);
519 }
520 }
521
522 @Override
523 public Collection<ControllerNode> listMembers() {
524 if (copycat == null) {
525 return ImmutableList.of();
526 }
527 Set<ControllerNode> members = new HashSet<>();
528 for (Member member : copycat.cluster().members()) {
529 if (member instanceof TcpMember) {
530 final TcpMember tcpMember = (TcpMember) member;
531 // TODO assuming tcpMember#host to be IP address,
532 // but if not lookup DNS, etc. first
533 IpAddress ip = IpAddress.valueOf(tcpMember.host());
534 int tcpPort = tcpMember.port();
535 NodeId id = getNodeIdFromIp(ip, tcpPort);
536 if (id == null) {
537 log.info("No NodeId found for {}:{}", ip, tcpPort);
538 continue;
539 }
540 members.add(new DefaultControllerNode(id, ip, tcpPort));
541 }
542 }
543 return members;
544 }
545
546 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
547 for (ControllerNode node : clusterService.getNodes()) {
548 if (node.ip().equals(ip) &&
549 node.tcpPort() == tcpPort) {
550 return node.id();
551 }
552 }
553 return null;
554 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800555}