blob: f328842e934e70cb65a31f21f380516188c980ac [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08005import java.io.File;
6import java.io.IOException;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.ArrayList;
8import java.util.Arrays;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08009import java.util.Collection;
10import java.util.Collections;
11import java.util.HashSet;
Madan Jampani08822c42014-11-04 17:17:46 -080012import java.util.List;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080013import java.util.Map;
14import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080015import java.util.concurrent.CountDownLatch;
16import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080017
18import net.kuujo.copycat.Copycat;
19import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080020import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080021import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080022import net.kuujo.copycat.cluster.TcpCluster;
23import net.kuujo.copycat.cluster.TcpClusterConfig;
24import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080025import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080026import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080027
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080028import org.apache.commons.lang3.RandomUtils;
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080030import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080031import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080032import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080034import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080035import org.onlab.onos.cluster.ClusterEvent;
36import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080037import org.onlab.onos.cluster.ClusterService;
38import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080039import org.onlab.onos.cluster.DefaultControllerNode;
40import org.onlab.onos.cluster.NodeId;
Madan Jampani08822c42014-11-04 17:17:46 -080041import org.onlab.onos.store.service.DatabaseAdminService;
42import org.onlab.onos.store.service.DatabaseException;
43import org.onlab.onos.store.service.DatabaseService;
44import org.onlab.onos.store.service.NoSuchTableException;
45import org.onlab.onos.store.service.OptimisticLockException;
46import org.onlab.onos.store.service.OptionalResult;
Madan Jampani08822c42014-11-04 17:17:46 -080047import org.onlab.onos.store.service.ReadRequest;
48import org.onlab.onos.store.service.ReadResult;
49import org.onlab.onos.store.service.WriteAborted;
50import org.onlab.onos.store.service.WriteRequest;
51import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080052import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080053import org.slf4j.Logger;
54
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080055import com.google.common.collect.ImmutableList;
56import com.google.common.collect.Iterables;
57
Madan Jampani08822c42014-11-04 17:17:46 -080058/**
59 * Strongly consistent and durable state management service based on
60 * Copycat implementation of Raft consensus protocol.
61 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080062@Component(immediate = true)
63@Service
Madan Jampani08822c42014-11-04 17:17:46 -080064public class DatabaseManager implements DatabaseService, DatabaseAdminService {
65
66 private final Logger log = getLogger(getClass());
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080069 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080070
Madan Jampani9b19a822014-11-04 21:37:13 -080071 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080072 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080073
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080074 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
75
76 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
77 // TODO: Get the path to /opt/onos/config
78 private static final String CONFIG_DIR = "../config";
79
80 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
81
82 private static final String DEFAULT_TABLET = "default";
83
84 // TODO: make this configurable
85 // initial member configuration file path
86 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080087
88 private Copycat copycat;
89 private DatabaseClient client;
90
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080091 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080092 private ClusterConfig<TcpMember> clusterConfig;
93
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080094 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080095 private ClusterEventListener clusterEventListener;
96
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080097 private Map<String, Set<DefaultControllerNode>> tabletMembers;
98
99 private boolean autoAddMember = false;
100
Madan Jampani08822c42014-11-04 17:17:46 -0800101 @Activate
102 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800103
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800104 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -0800105
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800106 // load tablet configuration
107 File file = new File(CONFIG_DIR, initialMemberConfig);
108 log.info("Loading config: {}", file.getAbsolutePath());
109 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
110 try {
111 tabletMembers = tabletDef.read();
112 } catch (IOException e) {
113 log.error("Failed to load tablet config {}", file);
114 throw new IllegalStateException("Failed to load tablet config", e);
115 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800116
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800117 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800118 clusterConfig = new TcpClusterConfig();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800119 Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
120 if (defaultMember == null || defaultMember.isEmpty()) {
121 log.error("No member found in [{}] tablet configuration.",
122 DEFAULT_TABLET);
123 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800124
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800125 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800126
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800127 final ControllerNode localNode = clusterService.getLocalNode();
128 TcpMember clientHandler = null;
129 for (ControllerNode member : defaultMember) {
130 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
131 member.tcpPort());
132 if (localNode.equals(member)) {
133 clientHandler = tcpMember;
134 clusterConfig.setLocalMember(tcpMember);
135 } else {
136 clusterConfig.addRemoteMember(tcpMember);
137 }
138 }
139
140 // TODO should be removed after DatabaseClient refactoring
141 if (clientHandler == null) {
142 Set<TcpMember> members = clusterConfig.getMembers();
143 if (members.isEmpty()) {
144 log.error("No member found in [{}] tablet configuration.",
145 DEFAULT_TABLET);
146 throw new IllegalStateException("No member found in tablet configuration");
147 }
148 int position = RandomUtils.nextInt(0, members.size());
149 clientHandler = Iterables.get(members, position);
150 }
151
152 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800153 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800154 clusterEventListener = new InternalClusterEventListener();
155 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800156
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800157 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
158 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800159 try {
160 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800161 log.info("Starting with {}/{} nodes cluster",
162 clusterService.getNodes().size(),
163 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800164 }
165 } catch (InterruptedException e) {
166 log.info("Interrupted waiting for others", e);
167 }
168 }
Madan Jampani08822c42014-11-04 17:17:46 -0800169
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800170 final TcpCluster cluster;
171 synchronized (clusterConfig) {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800172 // Create the cluster.
173 cluster = new TcpCluster(clusterConfig);
174 }
175 log.info("Starting cluster: {}", cluster);
176
Madan Jampani08822c42014-11-04 17:17:46 -0800177
178 StateMachine stateMachine = new DatabaseStateMachine();
Madan Jampani2ee20002014-11-06 20:06:12 -0800179 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800180 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800181
Madan Jampani9b19a822014-11-04 21:37:13 -0800182 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800183 copycat.start();
184
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800185 // FIXME Redo DatabaseClient. Needs fall back mechanism etc.
186 client = new DatabaseClient(copycatMessagingProtocol.createClient(clientHandler));
Madan Jampani08822c42014-11-04 17:17:46 -0800187
188 log.info("Started.");
189 }
190
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800191 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800192 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800193 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800194 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800195 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800196 }
197
198 @Override
199 public boolean createTable(String name) {
200 return client.createTable(name);
201 }
202
203 @Override
204 public void dropTable(String name) {
205 client.dropTable(name);
206 }
207
208 @Override
209 public void dropAllTables() {
210 client.dropAllTables();
211 }
212
213 @Override
214 public List<String> listTables() {
215 return client.listTables();
216 }
217
218 @Override
219 public ReadResult read(ReadRequest request) {
220 return batchRead(Arrays.asList(request)).get(0).get();
221 }
222
223 @Override
224 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
225 List<ReadRequest> batch) {
226 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
227 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
228 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
229 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
230 new NoSuchTableException()));
231 } else {
232 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
233 internalReadResult.result()));
234 }
235 }
236 return readResults;
237 }
238
239 @Override
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800240 public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
241 return batchWrite(Arrays.asList(request)).get(0);
242 }
243
244 @Override
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800245 public WriteResult write(WriteRequest request) {
246// throws OptimisticLockException, PreconditionFailedException {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800247 return writeNothrow(request).get();
Madan Jampani08822c42014-11-04 17:17:46 -0800248 }
249
250 @Override
251 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
252 List<WriteRequest> batch) {
253 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
254 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
255 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
256 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
257 new NoSuchTableException()));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800258 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
Madan Jampani08822c42014-11-04 17:17:46 -0800259 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
260 new OptimisticLockException()));
261 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
262 // TODO: throw a different exception?
263 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800264 new OptimisticLockException()));
Madan Jampani08822c42014-11-04 17:17:46 -0800265 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
266 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
267 new WriteAborted()));
268 } else {
269 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
270 internalWriteResult.result()));
271 }
272 }
273 return writeResults;
274
275 }
276
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800277 private final class InternalClusterEventListener
278 implements ClusterEventListener {
279
280 @Override
281 public void event(ClusterEvent event) {
282 // TODO: Not every node should be part of the consensus ring.
283
284 final ControllerNode node = event.subject();
285 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
286 node.tcpPort());
287
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800288 switch (event.type()) {
289 case INSTANCE_ACTIVATED:
290 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800291 if (autoAddMember) {
292 synchronized (clusterConfig) {
293 if (!clusterConfig.getMembers().contains(tcpMember)) {
294 log.info("{} was automatically added to the cluster", tcpMember);
295 clusterConfig.addRemoteMember(tcpMember);
296 }
297 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800298 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800299 break;
300 case INSTANCE_DEACTIVATED:
301 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800302 if (autoAddMember) {
303 Set<DefaultControllerNode> members
304 = tabletMembers.getOrDefault(DEFAULT_TABLET,
305 Collections.emptySet());
306 // remove only if not the initial members
307 if (!members.contains(node)) {
308 synchronized (clusterConfig) {
309 if (clusterConfig.getMembers().contains(tcpMember)) {
310 log.info("{} was automatically removed from the cluster", tcpMember);
311 clusterConfig.removeRemoteMember(tcpMember);
312 }
313 }
314 }
315 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800316 break;
317 default:
318 break;
319 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800320 if (copycat != null) {
321 log.debug("Current cluster: {}", copycat.cluster());
322 }
323 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800324 }
325
326 }
327
328 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
329 public KryoRegisteredInMemoryLog() {
330 super();
331 // required to deserialize object across bundles
332 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
333 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
334 }
335 }
336
Madan Jampani08822c42014-11-04 17:17:46 -0800337 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
338
339 private final R result;
340 private final DatabaseException exception;
341
342 public DatabaseOperationResult(R result) {
343 this.result = result;
344 this.exception = null;
345 }
346
347 public DatabaseOperationResult(DatabaseException exception) {
348 this.result = null;
349 this.exception = exception;
350 }
351
352 @Override
353 public R get() {
354 if (result != null) {
355 return result;
356 }
357 throw exception;
358 }
359
360 @Override
361 public boolean hasValidResult() {
362 return result != null;
363 }
364
365 @Override
366 public String toString() {
367 if (result != null) {
368 return result.toString();
369 } else {
370 return exception.toString();
371 }
372 }
373 }
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800374
375 @Override
376 public void addMember(final ControllerNode node) {
377 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
378 node.tcpPort());
379 log.info("{} was added to the cluster", tcpMember);
380 synchronized (clusterConfig) {
381 clusterConfig.addRemoteMember(tcpMember);
382 }
383 }
384
385 @Override
386 public void removeMember(final ControllerNode node) {
387 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
388 node.tcpPort());
389 log.info("{} was removed from the cluster", tcpMember);
390 synchronized (clusterConfig) {
391 clusterConfig.removeRemoteMember(tcpMember);
392 }
393 }
394
395 @Override
396 public Collection<ControllerNode> listMembers() {
397 if (copycat == null) {
398 return ImmutableList.of();
399 }
400 Set<ControllerNode> members = new HashSet<>();
401 for (Member member : copycat.cluster().members()) {
402 if (member instanceof TcpMember) {
403 final TcpMember tcpMember = (TcpMember) member;
404 // TODO assuming tcpMember#host to be IP address,
405 // but if not lookup DNS, etc. first
406 IpAddress ip = IpAddress.valueOf(tcpMember.host());
407 int tcpPort = tcpMember.port();
408 NodeId id = getNodeIdFromIp(ip, tcpPort);
409 if (id == null) {
410 log.info("No NodeId found for {}:{}", ip, tcpPort);
411 continue;
412 }
413 members.add(new DefaultControllerNode(id, ip, tcpPort));
414 }
415 }
416 return members;
417 }
418
419 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
420 for (ControllerNode node : clusterService.getNodes()) {
421 if (node.ip().equals(ip) &&
422 node.tcpPort() == tcpPort) {
423 return node.id();
424 }
425 }
426 return null;
427 }
Madan Jampani08822c42014-11-04 17:17:46 -0800428}