blob: 44d304176b3d9d4e088e56f6cf00e8c06b40e847 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
11import net.kuujo.copycat.cluster.TcpCluster;
12import net.kuujo.copycat.cluster.TcpClusterConfig;
13import net.kuujo.copycat.cluster.TcpMember;
14import net.kuujo.copycat.log.ChronicleLog;
15import net.kuujo.copycat.log.Log;
16
17import org.apache.felix.scr.annotations.Activate;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080018import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080019import org.apache.felix.scr.annotations.Reference;
20import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.onlab.onos.cluster.ClusterService;
22import org.onlab.onos.cluster.ControllerNode;
23import org.onlab.onos.store.service.DatabaseAdminService;
24import org.onlab.onos.store.service.DatabaseException;
25import org.onlab.onos.store.service.DatabaseService;
26import org.onlab.onos.store.service.NoSuchTableException;
27import org.onlab.onos.store.service.OptimisticLockException;
28import org.onlab.onos.store.service.OptionalResult;
29import org.onlab.onos.store.service.PreconditionFailedException;
30import org.onlab.onos.store.service.ReadRequest;
31import org.onlab.onos.store.service.ReadResult;
32import org.onlab.onos.store.service.WriteAborted;
33import org.onlab.onos.store.service.WriteRequest;
34import org.onlab.onos.store.service.WriteResult;
35import org.slf4j.Logger;
36
37import com.google.common.collect.Lists;
38
39/**
40 * Strongly consistent and durable state management service based on
41 * Copycat implementation of Raft consensus protocol.
42 */
Yuta HIGUCHI280ed8a2014-11-04 17:51:50 -080043//@Component(immediate = true)
44//@Service
Madan Jampani08822c42014-11-04 17:17:46 -080045public class DatabaseManager implements DatabaseService, DatabaseAdminService {
46
47 private final Logger log = getLogger(getClass());
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 ClusterService clusterService;
51
Madan Jampani9b19a822014-11-04 21:37:13 -080052 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 ClusterMessagingProtocol copycatMessagingProtocol;
54
Madan Jampani08822c42014-11-04 17:17:46 -080055 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
56
57 private Copycat copycat;
58 private DatabaseClient client;
59
60 @Activate
61 public void activate() {
62 TcpMember localMember =
63 new TcpMember(
64 clusterService.getLocalNode().ip().toString(),
Madan Jampani9b19a822014-11-04 21:37:13 -080065 clusterService.getLocalNode().tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080066 List<TcpMember> remoteMembers = Lists.newArrayList();
67
68 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080069 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080070 if (!member.equals(localMember)) {
71 remoteMembers.add(member);
72 }
73 }
74
75 // Configure the cluster.
76 TcpClusterConfig config = new TcpClusterConfig();
77
78 config.setLocalMember(localMember);
79 config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
80
81 // Create the cluster.
82 TcpCluster cluster = new TcpCluster(config);
83
84 StateMachine stateMachine = new DatabaseStateMachine();
85 ControllerNode thisNode = clusterService.getLocalNode();
86 Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
87
Madan Jampani9b19a822014-11-04 21:37:13 -080088 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -080089 copycat.start();
90
Madan Jampani9b19a822014-11-04 21:37:13 -080091 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -080092
93 log.info("Started.");
94 }
95
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080096 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -080097 public void deactivate() {
98 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080099 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800100 }
101
102 @Override
103 public boolean createTable(String name) {
104 return client.createTable(name);
105 }
106
107 @Override
108 public void dropTable(String name) {
109 client.dropTable(name);
110 }
111
112 @Override
113 public void dropAllTables() {
114 client.dropAllTables();
115 }
116
117 @Override
118 public List<String> listTables() {
119 return client.listTables();
120 }
121
122 @Override
123 public ReadResult read(ReadRequest request) {
124 return batchRead(Arrays.asList(request)).get(0).get();
125 }
126
127 @Override
128 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
129 List<ReadRequest> batch) {
130 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
131 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
132 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
133 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
134 new NoSuchTableException()));
135 } else {
136 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
137 internalReadResult.result()));
138 }
139 }
140 return readResults;
141 }
142
143 @Override
144 public WriteResult write(WriteRequest request) {
145 return batchWrite(Arrays.asList(request)).get(0).get();
146 }
147
148 @Override
149 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
150 List<WriteRequest> batch) {
151 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
152 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
153 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
154 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
155 new NoSuchTableException()));
156 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
157 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
158 new OptimisticLockException()));
159 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
160 // TODO: throw a different exception?
161 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
162 new PreconditionFailedException()));
163 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
164 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
165 new WriteAborted()));
166 } else {
167 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
168 internalWriteResult.result()));
169 }
170 }
171 return writeResults;
172
173 }
174
175 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
176
177 private final R result;
178 private final DatabaseException exception;
179
180 public DatabaseOperationResult(R result) {
181 this.result = result;
182 this.exception = null;
183 }
184
185 public DatabaseOperationResult(DatabaseException exception) {
186 this.result = null;
187 this.exception = exception;
188 }
189
190 @Override
191 public R get() {
192 if (result != null) {
193 return result;
194 }
195 throw exception;
196 }
197
198 @Override
199 public boolean hasValidResult() {
200 return result != null;
201 }
202
203 @Override
204 public String toString() {
205 if (result != null) {
206 return result.toString();
207 } else {
208 return exception.toString();
209 }
210 }
211 }
212}