blob: 7db4bc7f94e6a4820cd8ed5d74b279d3262f9d70 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
11import net.kuujo.copycat.cluster.TcpCluster;
12import net.kuujo.copycat.cluster.TcpClusterConfig;
13import net.kuujo.copycat.cluster.TcpMember;
14import net.kuujo.copycat.log.ChronicleLog;
15import net.kuujo.copycat.log.Log;
16
17import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080018import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080019import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.apache.felix.scr.annotations.Reference;
21import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080022import org.apache.felix.scr.annotations.Service;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.onlab.onos.cluster.ClusterService;
24import org.onlab.onos.cluster.ControllerNode;
25import org.onlab.onos.store.service.DatabaseAdminService;
26import org.onlab.onos.store.service.DatabaseException;
27import org.onlab.onos.store.service.DatabaseService;
28import org.onlab.onos.store.service.NoSuchTableException;
29import org.onlab.onos.store.service.OptimisticLockException;
30import org.onlab.onos.store.service.OptionalResult;
31import org.onlab.onos.store.service.PreconditionFailedException;
32import org.onlab.onos.store.service.ReadRequest;
33import org.onlab.onos.store.service.ReadResult;
34import org.onlab.onos.store.service.WriteAborted;
35import org.onlab.onos.store.service.WriteRequest;
36import org.onlab.onos.store.service.WriteResult;
37import org.slf4j.Logger;
38
39import com.google.common.collect.Lists;
40
41/**
42 * Strongly consistent and durable state management service based on
43 * Copycat implementation of Raft consensus protocol.
44 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080045@Component(immediate = true)
46@Service
Madan Jampani08822c42014-11-04 17:17:46 -080047public class DatabaseManager implements DatabaseService, DatabaseAdminService {
48
49 private final Logger log = getLogger(getClass());
50
51 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080052 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080053
Madan Jampani9b19a822014-11-04 21:37:13 -080054 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080055 protected ClusterMessagingProtocol copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080056
Madan Jampani08822c42014-11-04 17:17:46 -080057 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
58
59 private Copycat copycat;
60 private DatabaseClient client;
61
62 @Activate
63 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080064
65 // TODO: Not every node can be part of the consensus ring.
66
Madan Jampani08822c42014-11-04 17:17:46 -080067 TcpMember localMember =
68 new TcpMember(
69 clusterService.getLocalNode().ip().toString(),
Madan Jampani9b19a822014-11-04 21:37:13 -080070 clusterService.getLocalNode().tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080071 List<TcpMember> remoteMembers = Lists.newArrayList();
72
73 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080074 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080075 if (!member.equals(localMember)) {
76 remoteMembers.add(member);
77 }
78 }
79
80 // Configure the cluster.
81 TcpClusterConfig config = new TcpClusterConfig();
82
83 config.setLocalMember(localMember);
84 config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
85
86 // Create the cluster.
87 TcpCluster cluster = new TcpCluster(config);
88
89 StateMachine stateMachine = new DatabaseStateMachine();
90 ControllerNode thisNode = clusterService.getLocalNode();
91 Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
92
Madan Jampani9b19a822014-11-04 21:37:13 -080093 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -080094 copycat.start();
95
Madan Jampani9b19a822014-11-04 21:37:13 -080096 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -080097
98 log.info("Started.");
99 }
100
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800101 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800102 public void deactivate() {
103 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800104 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800105 }
106
107 @Override
108 public boolean createTable(String name) {
109 return client.createTable(name);
110 }
111
112 @Override
113 public void dropTable(String name) {
114 client.dropTable(name);
115 }
116
117 @Override
118 public void dropAllTables() {
119 client.dropAllTables();
120 }
121
122 @Override
123 public List<String> listTables() {
124 return client.listTables();
125 }
126
127 @Override
128 public ReadResult read(ReadRequest request) {
129 return batchRead(Arrays.asList(request)).get(0).get();
130 }
131
132 @Override
133 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
134 List<ReadRequest> batch) {
135 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
136 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
137 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
138 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
139 new NoSuchTableException()));
140 } else {
141 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
142 internalReadResult.result()));
143 }
144 }
145 return readResults;
146 }
147
148 @Override
149 public WriteResult write(WriteRequest request) {
150 return batchWrite(Arrays.asList(request)).get(0).get();
151 }
152
153 @Override
154 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
155 List<WriteRequest> batch) {
156 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
157 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
158 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
159 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
160 new NoSuchTableException()));
161 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
162 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
163 new OptimisticLockException()));
164 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
165 // TODO: throw a different exception?
166 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
167 new PreconditionFailedException()));
168 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
169 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
170 new WriteAborted()));
171 } else {
172 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
173 internalWriteResult.result()));
174 }
175 }
176 return writeResults;
177
178 }
179
180 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
181
182 private final R result;
183 private final DatabaseException exception;
184
185 public DatabaseOperationResult(R result) {
186 this.result = result;
187 this.exception = null;
188 }
189
190 public DatabaseOperationResult(DatabaseException exception) {
191 this.result = null;
192 this.exception = exception;
193 }
194
195 @Override
196 public R get() {
197 if (result != null) {
198 return result;
199 }
200 throw exception;
201 }
202
203 @Override
204 public boolean hasValidResult() {
205 return result != null;
206 }
207
208 @Override
209 public String toString() {
210 if (result != null) {
211 return result.toString();
212 } else {
213 return exception.toString();
214 }
215 }
216 }
217}