blob: a7d7075493747636d4356fab70346bf5feac6903 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
11import net.kuujo.copycat.cluster.TcpCluster;
12import net.kuujo.copycat.cluster.TcpClusterConfig;
13import net.kuujo.copycat.cluster.TcpMember;
14import net.kuujo.copycat.log.ChronicleLog;
15import net.kuujo.copycat.log.Log;
16
17import org.apache.felix.scr.annotations.Activate;
Madan Jampani08822c42014-11-04 17:17:46 -080018import org.apache.felix.scr.annotations.Reference;
19import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.onlab.netty.Endpoint;
21import org.onlab.onos.cluster.ClusterService;
22import org.onlab.onos.cluster.ControllerNode;
23import org.onlab.onos.store.service.DatabaseAdminService;
24import org.onlab.onos.store.service.DatabaseException;
25import org.onlab.onos.store.service.DatabaseService;
26import org.onlab.onos.store.service.NoSuchTableException;
27import org.onlab.onos.store.service.OptimisticLockException;
28import org.onlab.onos.store.service.OptionalResult;
29import org.onlab.onos.store.service.PreconditionFailedException;
30import org.onlab.onos.store.service.ReadRequest;
31import org.onlab.onos.store.service.ReadResult;
32import org.onlab.onos.store.service.WriteAborted;
33import org.onlab.onos.store.service.WriteRequest;
34import org.onlab.onos.store.service.WriteResult;
35import org.slf4j.Logger;
36
37import com.google.common.collect.Lists;
38
39/**
40 * Strongly consistent and durable state management service based on
41 * Copycat implementation of Raft consensus protocol.
42 */
Yuta HIGUCHI280ed8a2014-11-04 17:51:50 -080043//@Component(immediate = true)
44//@Service
Madan Jampani08822c42014-11-04 17:17:46 -080045public class DatabaseManager implements DatabaseService, DatabaseAdminService {
46
47 private final Logger log = getLogger(getClass());
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 ClusterService clusterService;
51
52 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
53
54 private Copycat copycat;
55 private DatabaseClient client;
56
57 @Activate
58 public void activate() {
Yuta HIGUCHI3a3ac962014-11-04 18:05:08 -080059 // FIXME hack tcpPort +1 for copycat communication
Madan Jampani08822c42014-11-04 17:17:46 -080060 TcpMember localMember =
61 new TcpMember(
62 clusterService.getLocalNode().ip().toString(),
Yuta HIGUCHI3a3ac962014-11-04 18:05:08 -080063 clusterService.getLocalNode().tcpPort() + 1);
Madan Jampani08822c42014-11-04 17:17:46 -080064 List<TcpMember> remoteMembers = Lists.newArrayList();
65
66 for (ControllerNode node : clusterService.getNodes()) {
Yuta HIGUCHI3a3ac962014-11-04 18:05:08 -080067 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort() + 1);
Madan Jampani08822c42014-11-04 17:17:46 -080068 if (!member.equals(localMember)) {
69 remoteMembers.add(member);
70 }
71 }
72
73 // Configure the cluster.
74 TcpClusterConfig config = new TcpClusterConfig();
75
76 config.setLocalMember(localMember);
77 config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
78
79 // Create the cluster.
80 TcpCluster cluster = new TcpCluster(config);
81
82 StateMachine stateMachine = new DatabaseStateMachine();
83 ControllerNode thisNode = clusterService.getLocalNode();
84 Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
85
86 copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
87 copycat.start();
88
89 client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
90
91 log.info("Started.");
92 }
93
94 @Activate
95 public void deactivate() {
96 copycat.stop();
97 }
98
99 @Override
100 public boolean createTable(String name) {
101 return client.createTable(name);
102 }
103
104 @Override
105 public void dropTable(String name) {
106 client.dropTable(name);
107 }
108
109 @Override
110 public void dropAllTables() {
111 client.dropAllTables();
112 }
113
114 @Override
115 public List<String> listTables() {
116 return client.listTables();
117 }
118
119 @Override
120 public ReadResult read(ReadRequest request) {
121 return batchRead(Arrays.asList(request)).get(0).get();
122 }
123
124 @Override
125 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
126 List<ReadRequest> batch) {
127 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
128 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
129 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
130 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
131 new NoSuchTableException()));
132 } else {
133 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
134 internalReadResult.result()));
135 }
136 }
137 return readResults;
138 }
139
140 @Override
141 public WriteResult write(WriteRequest request) {
142 return batchWrite(Arrays.asList(request)).get(0).get();
143 }
144
145 @Override
146 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
147 List<WriteRequest> batch) {
148 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
149 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
150 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
151 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
152 new NoSuchTableException()));
153 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
154 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
155 new OptimisticLockException()));
156 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
157 // TODO: throw a different exception?
158 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
159 new PreconditionFailedException()));
160 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
161 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
162 new WriteAborted()));
163 } else {
164 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
165 internalWriteResult.result()));
166 }
167 }
168 return writeResults;
169
170 }
171
172 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
173
174 private final R result;
175 private final DatabaseException exception;
176
177 public DatabaseOperationResult(R result) {
178 this.result = result;
179 this.exception = null;
180 }
181
182 public DatabaseOperationResult(DatabaseException exception) {
183 this.result = null;
184 this.exception = exception;
185 }
186
187 @Override
188 public R get() {
189 if (result != null) {
190 return result;
191 }
192 throw exception;
193 }
194
195 @Override
196 public boolean hasValidResult() {
197 return result != null;
198 }
199
200 @Override
201 public String toString() {
202 if (result != null) {
203 return result.toString();
204 } else {
205 return exception.toString();
206 }
207 }
208 }
209}