blob: 183a6dbb78d3f56d61d2b9e5cb6cd356d2349219 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
11import net.kuujo.copycat.cluster.TcpCluster;
12import net.kuujo.copycat.cluster.TcpClusterConfig;
13import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080014import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080015import net.kuujo.copycat.log.Log;
Madan Jampani08822c42014-11-04 17:17:46 -080016import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080017import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080018import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080019import org.apache.felix.scr.annotations.Reference;
20import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080021import org.apache.felix.scr.annotations.Service;
Madan Jampani08822c42014-11-04 17:17:46 -080022import org.onlab.onos.cluster.ClusterService;
23import org.onlab.onos.cluster.ControllerNode;
24import org.onlab.onos.store.service.DatabaseAdminService;
25import org.onlab.onos.store.service.DatabaseException;
26import org.onlab.onos.store.service.DatabaseService;
27import org.onlab.onos.store.service.NoSuchTableException;
28import org.onlab.onos.store.service.OptimisticLockException;
29import org.onlab.onos.store.service.OptionalResult;
30import org.onlab.onos.store.service.PreconditionFailedException;
31import org.onlab.onos.store.service.ReadRequest;
32import org.onlab.onos.store.service.ReadResult;
33import org.onlab.onos.store.service.WriteAborted;
34import org.onlab.onos.store.service.WriteRequest;
35import org.onlab.onos.store.service.WriteResult;
36import org.slf4j.Logger;
37
38import com.google.common.collect.Lists;
39
40/**
41 * Strongly consistent and durable state management service based on
42 * Copycat implementation of Raft consensus protocol.
43 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080044@Component(immediate = true)
45@Service
Madan Jampani08822c42014-11-04 17:17:46 -080046public class DatabaseManager implements DatabaseService, DatabaseAdminService {
47
48 private final Logger log = getLogger(getClass());
49
50 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080051 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080052
Madan Jampani9b19a822014-11-04 21:37:13 -080053 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080054 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080055
Madan Jampani08822c42014-11-04 17:17:46 -080056 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
57
58 private Copycat copycat;
59 private DatabaseClient client;
60
61 @Activate
62 public void activate() {
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080063 log.info("Starting.");
Madan Jampanidfbfa182014-11-04 22:06:41 -080064
65 // TODO: Not every node can be part of the consensus ring.
66
Madan Jampani08822c42014-11-04 17:17:46 -080067 TcpMember localMember =
68 new TcpMember(
69 clusterService.getLocalNode().ip().toString(),
Madan Jampani9b19a822014-11-04 21:37:13 -080070 clusterService.getLocalNode().tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080071 List<TcpMember> remoteMembers = Lists.newArrayList();
72
73 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080074 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080075 if (!member.equals(localMember)) {
76 remoteMembers.add(member);
77 }
78 }
79
80 // Configure the cluster.
81 TcpClusterConfig config = new TcpClusterConfig();
82
83 config.setLocalMember(localMember);
84 config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
85
86 // Create the cluster.
87 TcpCluster cluster = new TcpCluster(config);
88
89 StateMachine stateMachine = new DatabaseStateMachine();
90 ControllerNode thisNode = clusterService.getLocalNode();
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080091 // FIXME resolve Chronicle + OSGi issue
92 //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
93 Log consensusLog = new InMemoryLog();
Madan Jampani08822c42014-11-04 17:17:46 -080094
Madan Jampani9b19a822014-11-04 21:37:13 -080095 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -080096 copycat.start();
97
Madan Jampani9b19a822014-11-04 21:37:13 -080098 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -080099
100 log.info("Started.");
101 }
102
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800103 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800104 public void deactivate() {
105 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800106 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800107 }
108
109 @Override
110 public boolean createTable(String name) {
111 return client.createTable(name);
112 }
113
114 @Override
115 public void dropTable(String name) {
116 client.dropTable(name);
117 }
118
119 @Override
120 public void dropAllTables() {
121 client.dropAllTables();
122 }
123
124 @Override
125 public List<String> listTables() {
126 return client.listTables();
127 }
128
129 @Override
130 public ReadResult read(ReadRequest request) {
131 return batchRead(Arrays.asList(request)).get(0).get();
132 }
133
134 @Override
135 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
136 List<ReadRequest> batch) {
137 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
138 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
139 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
140 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
141 new NoSuchTableException()));
142 } else {
143 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
144 internalReadResult.result()));
145 }
146 }
147 return readResults;
148 }
149
150 @Override
151 public WriteResult write(WriteRequest request) {
152 return batchWrite(Arrays.asList(request)).get(0).get();
153 }
154
155 @Override
156 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
157 List<WriteRequest> batch) {
158 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
159 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
160 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
161 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
162 new NoSuchTableException()));
163 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
164 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
165 new OptimisticLockException()));
166 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
167 // TODO: throw a different exception?
168 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
169 new PreconditionFailedException()));
170 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
171 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
172 new WriteAborted()));
173 } else {
174 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
175 internalWriteResult.result()));
176 }
177 }
178 return writeResults;
179
180 }
181
182 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
183
184 private final R result;
185 private final DatabaseException exception;
186
187 public DatabaseOperationResult(R result) {
188 this.result = result;
189 this.exception = null;
190 }
191
192 public DatabaseOperationResult(DatabaseException exception) {
193 this.result = null;
194 this.exception = exception;
195 }
196
197 @Override
198 public R get() {
199 if (result != null) {
200 return result;
201 }
202 throw exception;
203 }
204
205 @Override
206 public boolean hasValidResult() {
207 return result != null;
208 }
209
210 @Override
211 public String toString() {
212 if (result != null) {
213 return result.toString();
214 } else {
215 return exception.toString();
216 }
217 }
218 }
219}