blob: 19ee882b60d18253580896cc948a26075128dc54 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8
9import net.kuujo.copycat.Copycat;
10import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080011import net.kuujo.copycat.cluster.ClusterConfig;
Madan Jampani08822c42014-11-04 17:17:46 -080012import net.kuujo.copycat.cluster.TcpCluster;
13import net.kuujo.copycat.cluster.TcpClusterConfig;
14import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080015import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080016import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080017
Madan Jampani08822c42014-11-04 17:17:46 -080018import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080019import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080020import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080023import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080024import org.onlab.onos.cluster.ClusterEvent;
25import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080026import org.onlab.onos.cluster.ClusterService;
27import org.onlab.onos.cluster.ControllerNode;
28import org.onlab.onos.store.service.DatabaseAdminService;
29import org.onlab.onos.store.service.DatabaseException;
30import org.onlab.onos.store.service.DatabaseService;
31import org.onlab.onos.store.service.NoSuchTableException;
32import org.onlab.onos.store.service.OptimisticLockException;
33import org.onlab.onos.store.service.OptionalResult;
34import org.onlab.onos.store.service.PreconditionFailedException;
35import org.onlab.onos.store.service.ReadRequest;
36import org.onlab.onos.store.service.ReadResult;
37import org.onlab.onos.store.service.WriteAborted;
38import org.onlab.onos.store.service.WriteRequest;
39import org.onlab.onos.store.service.WriteResult;
40import org.slf4j.Logger;
41
Madan Jampani08822c42014-11-04 17:17:46 -080042/**
43 * Strongly consistent and durable state management service based on
44 * Copycat implementation of Raft consensus protocol.
45 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080046@Component(immediate = true)
47@Service
Madan Jampani08822c42014-11-04 17:17:46 -080048public class DatabaseManager implements DatabaseService, DatabaseAdminService {
49
50 private final Logger log = getLogger(getClass());
51
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080053 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080054
Madan Jampani9b19a822014-11-04 21:37:13 -080055 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080056 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080057
Madan Jampani08822c42014-11-04 17:17:46 -080058 public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
59
60 private Copycat copycat;
61 private DatabaseClient client;
62
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080063 // TODO: check if synchronization is required to read/modify this
64 private ClusterConfig<TcpMember> clusterConfig;
65
66 private ClusterEventListener clusterEventListener;
67
Madan Jampani08822c42014-11-04 17:17:46 -080068 @Activate
69 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080070
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080071 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -080072
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080073 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani08822c42014-11-04 17:17:46 -080074 TcpMember localMember =
75 new TcpMember(
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080076 localNode.ip().toString(),
77 localNode.tcpPort());
78
79 clusterConfig = new TcpClusterConfig();
80 clusterConfig.setLocalMember(localMember);
81
82 List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
83
84 clusterEventListener = new InternalClusterEventListener();
85 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -080086
87 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080088 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080089 if (!member.equals(localMember)) {
90 remoteMembers.add(member);
91 }
92 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080093 clusterConfig.addRemoteMembers(remoteMembers);
Madan Jampani08822c42014-11-04 17:17:46 -080094
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080095 log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers);
Madan Jampani08822c42014-11-04 17:17:46 -080096
Madan Jampani08822c42014-11-04 17:17:46 -080097
98 // Create the cluster.
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080099 TcpCluster cluster = new TcpCluster(clusterConfig);
Madan Jampani08822c42014-11-04 17:17:46 -0800100
101 StateMachine stateMachine = new DatabaseStateMachine();
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -0800102 // FIXME resolve Chronicle + OSGi issue
103 //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800104 Log consensusLog = new KryoRegisteredInMemoryLog();
Madan Jampani08822c42014-11-04 17:17:46 -0800105
Madan Jampani9b19a822014-11-04 21:37:13 -0800106 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800107 copycat.start();
108
Madan Jampani9b19a822014-11-04 21:37:13 -0800109 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -0800110
111 log.info("Started.");
112 }
113
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800114 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800115 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800116 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800117 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800118 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800119 }
120
121 @Override
122 public boolean createTable(String name) {
123 return client.createTable(name);
124 }
125
126 @Override
127 public void dropTable(String name) {
128 client.dropTable(name);
129 }
130
131 @Override
132 public void dropAllTables() {
133 client.dropAllTables();
134 }
135
136 @Override
137 public List<String> listTables() {
138 return client.listTables();
139 }
140
141 @Override
142 public ReadResult read(ReadRequest request) {
143 return batchRead(Arrays.asList(request)).get(0).get();
144 }
145
146 @Override
147 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
148 List<ReadRequest> batch) {
149 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
150 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
151 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
152 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
153 new NoSuchTableException()));
154 } else {
155 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
156 internalReadResult.result()));
157 }
158 }
159 return readResults;
160 }
161
162 @Override
163 public WriteResult write(WriteRequest request) {
164 return batchWrite(Arrays.asList(request)).get(0).get();
165 }
166
167 @Override
168 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
169 List<WriteRequest> batch) {
170 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
171 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
172 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
173 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
174 new NoSuchTableException()));
175 } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
176 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
177 new OptimisticLockException()));
178 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
179 // TODO: throw a different exception?
180 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
181 new PreconditionFailedException()));
182 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
183 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
184 new WriteAborted()));
185 } else {
186 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
187 internalWriteResult.result()));
188 }
189 }
190 return writeResults;
191
192 }
193
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800194 private final class InternalClusterEventListener
195 implements ClusterEventListener {
196
197 @Override
198 public void event(ClusterEvent event) {
199 // TODO: Not every node should be part of the consensus ring.
200
201 final ControllerNode node = event.subject();
202 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
203 node.tcpPort());
204
205 log.trace("{}", event);
206 switch (event.type()) {
207 case INSTANCE_ACTIVATED:
208 case INSTANCE_ADDED:
209 log.info("{} was added to the cluster", tcpMember);
210 clusterConfig.addRemoteMember(tcpMember);
211 break;
212 case INSTANCE_DEACTIVATED:
213 case INSTANCE_REMOVED:
214 log.info("{} was removed from the cluster", tcpMember);
215 clusterConfig.removeRemoteMember(tcpMember);
216 break;
217 default:
218 break;
219 }
220 log.info("Current cluster: {}", clusterConfig.getMembers());
221 }
222
223 }
224
225 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
226 public KryoRegisteredInMemoryLog() {
227 super();
228 // required to deserialize object across bundles
229 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
230 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
231 }
232 }
233
Madan Jampani08822c42014-11-04 17:17:46 -0800234 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
235
236 private final R result;
237 private final DatabaseException exception;
238
239 public DatabaseOperationResult(R result) {
240 this.result = result;
241 this.exception = null;
242 }
243
244 public DatabaseOperationResult(DatabaseException exception) {
245 this.result = null;
246 this.exception = exception;
247 }
248
249 @Override
250 public R get() {
251 if (result != null) {
252 return result;
253 }
254 throw exception;
255 }
256
257 @Override
258 public boolean hasValidResult() {
259 return result != null;
260 }
261
262 @Override
263 public String toString() {
264 if (result != null) {
265 return result.toString();
266 } else {
267 return exception.toString();
268 }
269 }
270 }
271}