blob: 0d06e0828400bedadfa12523cb0af619dff18cac [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08005import java.io.File;
6import java.io.IOException;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.ArrayList;
8import java.util.Arrays;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -08009import java.util.Collection;
10import java.util.Collections;
11import java.util.HashSet;
Madan Jampani08822c42014-11-04 17:17:46 -080012import java.util.List;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080013import java.util.Map;
14import java.util.Set;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080015import java.util.concurrent.CountDownLatch;
16import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080017
18import net.kuujo.copycat.Copycat;
19import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080020import net.kuujo.copycat.cluster.ClusterConfig;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080021import net.kuujo.copycat.cluster.Member;
Madan Jampani08822c42014-11-04 17:17:46 -080022import net.kuujo.copycat.cluster.TcpCluster;
23import net.kuujo.copycat.cluster.TcpClusterConfig;
24import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080025import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080026import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080027
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080029import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080030import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080031import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080033import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080034import org.onlab.onos.cluster.ClusterEvent;
35import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080036import org.onlab.onos.cluster.ClusterService;
37import org.onlab.onos.cluster.ControllerNode;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080038import org.onlab.onos.cluster.DefaultControllerNode;
39import org.onlab.onos.cluster.NodeId;
Madan Jampani08822c42014-11-04 17:17:46 -080040import org.onlab.onos.store.service.DatabaseAdminService;
41import org.onlab.onos.store.service.DatabaseException;
42import org.onlab.onos.store.service.DatabaseService;
43import org.onlab.onos.store.service.NoSuchTableException;
44import org.onlab.onos.store.service.OptimisticLockException;
45import org.onlab.onos.store.service.OptionalResult;
Madan Jampani08822c42014-11-04 17:17:46 -080046import org.onlab.onos.store.service.ReadRequest;
47import org.onlab.onos.store.service.ReadResult;
48import org.onlab.onos.store.service.WriteAborted;
49import org.onlab.onos.store.service.WriteRequest;
50import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080051import org.onlab.packet.IpAddress;
Madan Jampani08822c42014-11-04 17:17:46 -080052import org.slf4j.Logger;
53
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080054import com.google.common.collect.ImmutableList;
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080055
Madan Jampani08822c42014-11-04 17:17:46 -080056/**
57 * Strongly consistent and durable state management service based on
58 * Copycat implementation of Raft consensus protocol.
59 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080060@Component(immediate = true)
61@Service
Madan Jampani08822c42014-11-04 17:17:46 -080062public class DatabaseManager implements DatabaseService, DatabaseAdminService {
63
64 private final Logger log = getLogger(getClass());
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080067 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080068
Madan Jampani9b19a822014-11-04 21:37:13 -080069 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080070 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080071
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080072 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
73
74 // Current working dir seems to be /opt/onos/apache-karaf-3.0.2
75 // TODO: Get the path to /opt/onos/config
76 private static final String CONFIG_DIR = "../config";
77
78 private static final String DEFAULT_MEMBER_FILE = "tablets.json";
79
80 private static final String DEFAULT_TABLET = "default";
81
82 // TODO: make this configurable
83 // initial member configuration file path
84 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
Madan Jampani08822c42014-11-04 17:17:46 -080085
86 private Copycat copycat;
87 private DatabaseClient client;
88
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080089 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080090 private ClusterConfig<TcpMember> clusterConfig;
91
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080092 private CountDownLatch clusterEventLatch;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080093 private ClusterEventListener clusterEventListener;
94
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -080095 private Map<String, Set<DefaultControllerNode>> tabletMembers;
96
97 private boolean autoAddMember = false;
98
Madan Jampani08822c42014-11-04 17:17:46 -080099 @Activate
100 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -0800101
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800102 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -0800103
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800104 // load tablet configuration
105 File file = new File(CONFIG_DIR, initialMemberConfig);
106 log.info("Loading config: {}", file.getAbsolutePath());
107 TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
108 try {
109 tabletMembers = tabletDef.read();
110 } catch (IOException e) {
111 log.error("Failed to load tablet config {}", file);
112 throw new IllegalStateException("Failed to load tablet config", e);
113 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800114
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800115 // load default tablet configuration and start copycat
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800116 clusterConfig = new TcpClusterConfig();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800117 Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
118 if (defaultMember == null || defaultMember.isEmpty()) {
119 log.error("No member found in [{}] tablet configuration.",
120 DEFAULT_TABLET);
121 throw new IllegalStateException("No member found in tablet configuration");
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800122
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800123 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800124
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800125 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800126 for (ControllerNode member : defaultMember) {
127 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
128 member.tcpPort());
129 if (localNode.equals(member)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800130 clusterConfig.setLocalMember(tcpMember);
131 } else {
132 clusterConfig.addRemoteMember(tcpMember);
133 }
134 }
135
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800136 // note: from this point beyond, clusterConfig requires synchronization
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800137 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800138 clusterEventListener = new InternalClusterEventListener();
139 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800140
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800141 if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
142 // current cluster size smaller then expected
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800143 try {
144 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800145 log.info("Starting with {}/{} nodes cluster",
146 clusterService.getNodes().size(),
147 clusterConfig.getMembers().size());
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800148 }
149 } catch (InterruptedException e) {
150 log.info("Interrupted waiting for others", e);
151 }
152 }
Madan Jampani08822c42014-11-04 17:17:46 -0800153
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800154 final TcpCluster cluster;
155 synchronized (clusterConfig) {
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800156 // Create the cluster.
157 cluster = new TcpCluster(clusterConfig);
158 }
159 log.info("Starting cluster: {}", cluster);
160
Madan Jampani08822c42014-11-04 17:17:46 -0800161
162 StateMachine stateMachine = new DatabaseStateMachine();
Madan Jampani2ee20002014-11-06 20:06:12 -0800163 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800164 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800165
Madan Jampani9b19a822014-11-04 21:37:13 -0800166 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800167 copycat.start();
168
Yuta HIGUCHIf8468442014-11-11 10:09:20 -0800169 client = new DatabaseClient(copycat);
Madan Jampani08822c42014-11-04 17:17:46 -0800170
171 log.info("Started.");
172 }
173
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800174 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800175 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800176 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800177 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800178 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800179 }
180
181 @Override
182 public boolean createTable(String name) {
183 return client.createTable(name);
184 }
185
186 @Override
187 public void dropTable(String name) {
188 client.dropTable(name);
189 }
190
191 @Override
192 public void dropAllTables() {
193 client.dropAllTables();
194 }
195
196 @Override
197 public List<String> listTables() {
198 return client.listTables();
199 }
200
201 @Override
202 public ReadResult read(ReadRequest request) {
203 return batchRead(Arrays.asList(request)).get(0).get();
204 }
205
206 @Override
207 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
208 List<ReadRequest> batch) {
209 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
210 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
211 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
212 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
213 new NoSuchTableException()));
214 } else {
215 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
216 internalReadResult.result()));
217 }
218 }
219 return readResults;
220 }
221
222 @Override
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800223 public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
224 return batchWrite(Arrays.asList(request)).get(0);
225 }
226
227 @Override
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800228 public WriteResult write(WriteRequest request) {
229// throws OptimisticLockException, PreconditionFailedException {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800230 return writeNothrow(request).get();
Madan Jampani08822c42014-11-04 17:17:46 -0800231 }
232
233 @Override
234 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
235 List<WriteRequest> batch) {
236 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
237 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
238 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
239 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
240 new NoSuchTableException()));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800241 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
Madan Jampani08822c42014-11-04 17:17:46 -0800242 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
243 new OptimisticLockException()));
244 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
245 // TODO: throw a different exception?
246 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800247 new OptimisticLockException()));
Madan Jampani08822c42014-11-04 17:17:46 -0800248 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
249 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
250 new WriteAborted()));
251 } else {
252 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
253 internalWriteResult.result()));
254 }
255 }
256 return writeResults;
257
258 }
259
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800260 private final class InternalClusterEventListener
261 implements ClusterEventListener {
262
263 @Override
264 public void event(ClusterEvent event) {
265 // TODO: Not every node should be part of the consensus ring.
266
267 final ControllerNode node = event.subject();
268 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
269 node.tcpPort());
270
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800271 switch (event.type()) {
272 case INSTANCE_ACTIVATED:
273 case INSTANCE_ADDED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800274 if (autoAddMember) {
275 synchronized (clusterConfig) {
276 if (!clusterConfig.getMembers().contains(tcpMember)) {
277 log.info("{} was automatically added to the cluster", tcpMember);
278 clusterConfig.addRemoteMember(tcpMember);
279 }
280 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800281 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800282 break;
283 case INSTANCE_DEACTIVATED:
284 case INSTANCE_REMOVED:
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800285 if (autoAddMember) {
286 Set<DefaultControllerNode> members
287 = tabletMembers.getOrDefault(DEFAULT_TABLET,
288 Collections.emptySet());
289 // remove only if not the initial members
290 if (!members.contains(node)) {
291 synchronized (clusterConfig) {
292 if (clusterConfig.getMembers().contains(tcpMember)) {
293 log.info("{} was automatically removed from the cluster", tcpMember);
294 clusterConfig.removeRemoteMember(tcpMember);
295 }
296 }
297 }
298 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800299 break;
300 default:
301 break;
302 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800303 if (copycat != null) {
304 log.debug("Current cluster: {}", copycat.cluster());
305 }
306 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800307 }
308
309 }
310
311 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
312 public KryoRegisteredInMemoryLog() {
313 super();
314 // required to deserialize object across bundles
315 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
316 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
317 }
318 }
319
Madan Jampani08822c42014-11-04 17:17:46 -0800320 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
321
322 private final R result;
323 private final DatabaseException exception;
324
325 public DatabaseOperationResult(R result) {
326 this.result = result;
327 this.exception = null;
328 }
329
330 public DatabaseOperationResult(DatabaseException exception) {
331 this.result = null;
332 this.exception = exception;
333 }
334
335 @Override
336 public R get() {
337 if (result != null) {
338 return result;
339 }
340 throw exception;
341 }
342
343 @Override
344 public boolean hasValidResult() {
345 return result != null;
346 }
347
348 @Override
349 public String toString() {
350 if (result != null) {
351 return result.toString();
352 } else {
353 return exception.toString();
354 }
355 }
356 }
Yuta HIGUCHI60731cb2014-11-11 01:34:46 -0800357
358 @Override
359 public void addMember(final ControllerNode node) {
360 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
361 node.tcpPort());
362 log.info("{} was added to the cluster", tcpMember);
363 synchronized (clusterConfig) {
364 clusterConfig.addRemoteMember(tcpMember);
365 }
366 }
367
368 @Override
369 public void removeMember(final ControllerNode node) {
370 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
371 node.tcpPort());
372 log.info("{} was removed from the cluster", tcpMember);
373 synchronized (clusterConfig) {
374 clusterConfig.removeRemoteMember(tcpMember);
375 }
376 }
377
378 @Override
379 public Collection<ControllerNode> listMembers() {
380 if (copycat == null) {
381 return ImmutableList.of();
382 }
383 Set<ControllerNode> members = new HashSet<>();
384 for (Member member : copycat.cluster().members()) {
385 if (member instanceof TcpMember) {
386 final TcpMember tcpMember = (TcpMember) member;
387 // TODO assuming tcpMember#host to be IP address,
388 // but if not lookup DNS, etc. first
389 IpAddress ip = IpAddress.valueOf(tcpMember.host());
390 int tcpPort = tcpMember.port();
391 NodeId id = getNodeIdFromIp(ip, tcpPort);
392 if (id == null) {
393 log.info("No NodeId found for {}:{}", ip, tcpPort);
394 continue;
395 }
396 members.add(new DefaultControllerNode(id, ip, tcpPort));
397 }
398 }
399 return members;
400 }
401
402 private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
403 for (ControllerNode node : clusterService.getNodes()) {
404 if (node.ip().equals(ip) &&
405 node.tcpPort() == tcpPort) {
406 return node.id();
407 }
408 }
409 return null;
410 }
Madan Jampani08822c42014-11-04 17:17:46 -0800411}