blob: 8042cc6893c230cf3b4aadd8c5e7853065d3fab2 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
11import net.kuujo.copycat.cluster.TcpCluster;
12import net.kuujo.copycat.cluster.TcpClusterConfig;
13import net.kuujo.copycat.cluster.TcpMember;
14import net.kuujo.copycat.log.ChronicleLog;
15import net.kuujo.copycat.log.Log;
16
17import org.apache.felix.scr.annotations.Activate;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080018import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080019import org.apache.felix.scr.annotations.Reference;
20import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.onlab.netty.Endpoint;
22import org.onlab.onos.cluster.ClusterService;
23import org.onlab.onos.cluster.ControllerNode;
24import org.onlab.onos.store.service.DatabaseAdminService;
25import org.onlab.onos.store.service.DatabaseException;
26import org.onlab.onos.store.service.DatabaseService;
27import org.onlab.onos.store.service.NoSuchTableException;
28import org.onlab.onos.store.service.OptimisticLockException;
29import org.onlab.onos.store.service.OptionalResult;
30import org.onlab.onos.store.service.PreconditionFailedException;
31import org.onlab.onos.store.service.ReadRequest;
32import org.onlab.onos.store.service.ReadResult;
33import org.onlab.onos.store.service.WriteAborted;
34import org.onlab.onos.store.service.WriteRequest;
35import org.onlab.onos.store.service.WriteResult;
36import org.slf4j.Logger;
37
38import com.google.common.collect.Lists;
39
40/**
41 * Strongly consistent and durable state management service based on
42 * Copycat implementation of Raft consensus protocol.
43 */
Yuta HIGUCHI280ed8a2014-11-04 17:51:50 -080044//@Component(immediate = true)
45//@Service
Madan Jampani08822c42014-11-04 17:17:46 -080046public class DatabaseManager implements DatabaseService, DatabaseAdminService {
47
48 private final Logger log = getLogger(getClass());
49
50 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
51 ClusterService clusterService;
52
53 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
54
55 private Copycat copycat;
56 private DatabaseClient client;
57
58 @Activate
59 public void activate() {
Yuta HIGUCHI3a3ac962014-11-04 18:05:08 -080060 // FIXME hack tcpPort +1 for copycat communication
Madan Jampani08822c42014-11-04 17:17:46 -080061 TcpMember localMember =
62 new TcpMember(
63 clusterService.getLocalNode().ip().toString(),
Yuta HIGUCHI3a3ac962014-11-04 18:05:08 -080064 clusterService.getLocalNode().tcpPort() + 1);
Madan Jampani08822c42014-11-04 17:17:46 -080065 List<TcpMember> remoteMembers = Lists.newArrayList();
66
67 for (ControllerNode node : clusterService.getNodes()) {
Yuta HIGUCHI3a3ac962014-11-04 18:05:08 -080068 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort() + 1);
Madan Jampani08822c42014-11-04 17:17:46 -080069 if (!member.equals(localMember)) {
70 remoteMembers.add(member);
71 }
72 }
73
74 // Configure the cluster.
75 TcpClusterConfig config = new TcpClusterConfig();
76
77 config.setLocalMember(localMember);
78 config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
79
80 // Create the cluster.
81 TcpCluster cluster = new TcpCluster(config);
82
83 StateMachine stateMachine = new DatabaseStateMachine();
84 ControllerNode thisNode = clusterService.getLocalNode();
85 Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
86
87 copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
88 copycat.start();
89
90 client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
91
92 log.info("Started.");
93 }
94
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080095 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -080096 public void deactivate() {
97 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080098 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -080099 }
100
101 @Override
102 public boolean createTable(String name) {
103 return client.createTable(name);
104 }
105
106 @Override
107 public void dropTable(String name) {
108 client.dropTable(name);
109 }
110
111 @Override
112 public void dropAllTables() {
113 client.dropAllTables();
114 }
115
116 @Override
117 public List<String> listTables() {
118 return client.listTables();
119 }
120
121 @Override
122 public ReadResult read(ReadRequest request) {
123 return batchRead(Arrays.asList(request)).get(0).get();
124 }
125
126 @Override
127 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
128 List<ReadRequest> batch) {
129 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
130 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
131 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
132 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
133 new NoSuchTableException()));
134 } else {
135 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
136 internalReadResult.result()));
137 }
138 }
139 return readResults;
140 }
141
142 @Override
143 public WriteResult write(WriteRequest request) {
144 return batchWrite(Arrays.asList(request)).get(0).get();
145 }
146
147 @Override
148 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
149 List<WriteRequest> batch) {
150 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
151 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
152 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
153 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
154 new NoSuchTableException()));
155 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
156 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
157 new OptimisticLockException()));
158 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
159 // TODO: throw a different exception?
160 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
161 new PreconditionFailedException()));
162 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
163 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
164 new WriteAborted()));
165 } else {
166 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
167 internalWriteResult.result()));
168 }
169 }
170 return writeResults;
171
172 }
173
174 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
175
176 private final R result;
177 private final DatabaseException exception;
178
179 public DatabaseOperationResult(R result) {
180 this.result = result;
181 this.exception = null;
182 }
183
184 public DatabaseOperationResult(DatabaseException exception) {
185 this.result = null;
186 this.exception = exception;
187 }
188
189 @Override
190 public R get() {
191 if (result != null) {
192 return result;
193 }
194 throw exception;
195 }
196
197 @Override
198 public boolean hasValidResult() {
199 return result != null;
200 }
201
202 @Override
203 public String toString() {
204 if (result != null) {
205 return result.toString();
206 } else {
207 return exception.toString();
208 }
209 }
210 }
211}