blob: 00ce12d8187ad19e69414e1dae597334f693bddd [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
11import net.kuujo.copycat.cluster.TcpCluster;
12import net.kuujo.copycat.cluster.TcpClusterConfig;
13import net.kuujo.copycat.cluster.TcpMember;
14import net.kuujo.copycat.log.ChronicleLog;
15import net.kuujo.copycat.log.Log;
16
17import org.apache.felix.scr.annotations.Activate;
18import org.apache.felix.scr.annotations.Component;
19import org.apache.felix.scr.annotations.Reference;
20import org.apache.felix.scr.annotations.ReferenceCardinality;
21import org.apache.felix.scr.annotations.Service;
22import org.onlab.netty.Endpoint;
23import org.onlab.onos.cluster.ClusterService;
24import org.onlab.onos.cluster.ControllerNode;
25import org.onlab.onos.store.service.DatabaseAdminService;
26import org.onlab.onos.store.service.DatabaseException;
27import org.onlab.onos.store.service.DatabaseService;
28import org.onlab.onos.store.service.NoSuchTableException;
29import org.onlab.onos.store.service.OptimisticLockException;
30import org.onlab.onos.store.service.OptionalResult;
31import org.onlab.onos.store.service.PreconditionFailedException;
32import org.onlab.onos.store.service.ReadRequest;
33import org.onlab.onos.store.service.ReadResult;
34import org.onlab.onos.store.service.WriteAborted;
35import org.onlab.onos.store.service.WriteRequest;
36import org.onlab.onos.store.service.WriteResult;
37import org.slf4j.Logger;
38
39import com.google.common.collect.Lists;
40
41/**
42 * Strongly consistent and durable state management service based on
43 * Copycat implementation of Raft consensus protocol.
44 */
45@Component(immediate = true)
46@Service
47public class DatabaseManager implements DatabaseService, DatabaseAdminService {
48
49 private final Logger log = getLogger(getClass());
50
51 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
52 ClusterService clusterService;
53
54 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
55
56 private Copycat copycat;
57 private DatabaseClient client;
58
59 @Activate
60 public void activate() {
61 TcpMember localMember =
62 new TcpMember(
63 clusterService.getLocalNode().ip().toString(),
64 clusterService.getLocalNode().tcpPort());
65 List<TcpMember> remoteMembers = Lists.newArrayList();
66
67 for (ControllerNode node : clusterService.getNodes()) {
68 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
69 if (!member.equals(localMember)) {
70 remoteMembers.add(member);
71 }
72 }
73
74 // Configure the cluster.
75 TcpClusterConfig config = new TcpClusterConfig();
76
77 config.setLocalMember(localMember);
78 config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
79
80 // Create the cluster.
81 TcpCluster cluster = new TcpCluster(config);
82
83 StateMachine stateMachine = new DatabaseStateMachine();
84 ControllerNode thisNode = clusterService.getLocalNode();
85 Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
86
87 copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
88 copycat.start();
89
90 client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
91
92 log.info("Started.");
93 }
94
95 @Activate
96 public void deactivate() {
97 copycat.stop();
98 }
99
100 @Override
101 public boolean createTable(String name) {
102 return client.createTable(name);
103 }
104
105 @Override
106 public void dropTable(String name) {
107 client.dropTable(name);
108 }
109
110 @Override
111 public void dropAllTables() {
112 client.dropAllTables();
113 }
114
115 @Override
116 public List<String> listTables() {
117 return client.listTables();
118 }
119
120 @Override
121 public ReadResult read(ReadRequest request) {
122 return batchRead(Arrays.asList(request)).get(0).get();
123 }
124
125 @Override
126 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
127 List<ReadRequest> batch) {
128 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
129 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
130 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
131 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
132 new NoSuchTableException()));
133 } else {
134 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
135 internalReadResult.result()));
136 }
137 }
138 return readResults;
139 }
140
141 @Override
142 public WriteResult write(WriteRequest request) {
143 return batchWrite(Arrays.asList(request)).get(0).get();
144 }
145
146 @Override
147 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
148 List<WriteRequest> batch) {
149 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
150 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
151 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
152 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
153 new NoSuchTableException()));
154 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
155 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
156 new OptimisticLockException()));
157 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
158 // TODO: throw a different exception?
159 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
160 new PreconditionFailedException()));
161 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
162 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
163 new WriteAborted()));
164 } else {
165 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
166 internalWriteResult.result()));
167 }
168 }
169 return writeResults;
170
171 }
172
173 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
174
175 private final R result;
176 private final DatabaseException exception;
177
178 public DatabaseOperationResult(R result) {
179 this.result = result;
180 this.exception = null;
181 }
182
183 public DatabaseOperationResult(DatabaseException exception) {
184 this.result = null;
185 this.exception = exception;
186 }
187
188 @Override
189 public R get() {
190 if (result != null) {
191 return result;
192 }
193 throw exception;
194 }
195
196 @Override
197 public boolean hasValidResult() {
198 return result != null;
199 }
200
201 @Override
202 public String toString() {
203 if (result != null) {
204 return result.toString();
205 } else {
206 return exception.toString();
207 }
208 }
209 }
210}