blob: 0c334855b4255e3bfc70df80789b9fd99405883d [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08005import java.io.File;
Madan Jampani08822c42014-11-04 17:17:46 -08006import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.List;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -08009import java.util.concurrent.CountDownLatch;
10import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080011
12import net.kuujo.copycat.Copycat;
13import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080014import net.kuujo.copycat.cluster.ClusterConfig;
Madan Jampani08822c42014-11-04 17:17:46 -080015import net.kuujo.copycat.cluster.TcpCluster;
16import net.kuujo.copycat.cluster.TcpClusterConfig;
17import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080018import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080019import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080020
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080022import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080023import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080026import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080027import org.onlab.onos.cluster.ClusterEvent;
28import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.onlab.onos.cluster.ClusterService;
30import org.onlab.onos.cluster.ControllerNode;
31import org.onlab.onos.store.service.DatabaseAdminService;
32import org.onlab.onos.store.service.DatabaseException;
33import org.onlab.onos.store.service.DatabaseService;
34import org.onlab.onos.store.service.NoSuchTableException;
35import org.onlab.onos.store.service.OptimisticLockException;
36import org.onlab.onos.store.service.OptionalResult;
Madan Jampani08822c42014-11-04 17:17:46 -080037import org.onlab.onos.store.service.ReadRequest;
38import org.onlab.onos.store.service.ReadResult;
39import org.onlab.onos.store.service.WriteAborted;
40import org.onlab.onos.store.service.WriteRequest;
41import org.onlab.onos.store.service.WriteResult;
42import org.slf4j.Logger;
43
Madan Jampani08822c42014-11-04 17:17:46 -080044/**
45 * Strongly consistent and durable state management service based on
46 * Copycat implementation of Raft consensus protocol.
47 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080048@Component(immediate = true)
49@Service
Madan Jampani08822c42014-11-04 17:17:46 -080050public class DatabaseManager implements DatabaseService, DatabaseAdminService {
51
52 private final Logger log = getLogger(getClass());
53
54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080055 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080056
Madan Jampani9b19a822014-11-04 21:37:13 -080057 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080058 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080059
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080060 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
Madan Jampani08822c42014-11-04 17:17:46 -080061
62 private Copycat copycat;
63 private DatabaseClient client;
64
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080065 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080066 private ClusterConfig<TcpMember> clusterConfig;
67
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080068 private CountDownLatch clusterEventLatch;
69
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080070 private ClusterEventListener clusterEventListener;
71
Madan Jampani08822c42014-11-04 17:17:46 -080072 @Activate
73 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080074
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080075 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -080076
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080077 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani08822c42014-11-04 17:17:46 -080078 TcpMember localMember =
79 new TcpMember(
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080080 localNode.ip().toString(),
81 localNode.tcpPort());
82
83 clusterConfig = new TcpClusterConfig();
84 clusterConfig.setLocalMember(localMember);
85
86 List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
87
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080088 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080089 clusterEventListener = new InternalClusterEventListener();
90 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -080091
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080092 // note: from this point beyond, clusterConfig requires synchronization
93
Madan Jampani08822c42014-11-04 17:17:46 -080094 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080095 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080096 if (!member.equals(localMember)) {
97 remoteMembers.add(member);
98 }
99 }
100
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800101 if (remoteMembers.isEmpty()) {
102 log.info("This node is the only node in the cluster. "
103 + "Waiting for others to show up.");
104 // FIXME: hack trying to relax cases forming multiple consensus rings.
105 // add seed node configuration to avoid this
Madan Jampani08822c42014-11-04 17:17:46 -0800106
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800107 // If the node is alone on it's own, wait some time
108 // hoping other will come up soon
109 try {
110 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
111 log.info("Starting as single node cluster");
112 }
113 } catch (InterruptedException e) {
114 log.info("Interrupted waiting for others", e);
115 }
116 }
Madan Jampani08822c42014-11-04 17:17:46 -0800117
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800118 final TcpCluster cluster;
119 synchronized (clusterConfig) {
120 clusterConfig.addRemoteMembers(remoteMembers);
121
122 // Create the cluster.
123 cluster = new TcpCluster(clusterConfig);
124 }
125 log.info("Starting cluster: {}", cluster);
126
Madan Jampani08822c42014-11-04 17:17:46 -0800127
128 StateMachine stateMachine = new DatabaseStateMachine();
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800129 // Chronicle + OSGi issue
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -0800130 //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800131 //Log consensusLog = new KryoRegisteredInMemoryLog();
132 Log consensusLog = new MapDBLog(new File(LOG_FILE_PREFIX + localNode.id()),
133 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800134
Madan Jampani9b19a822014-11-04 21:37:13 -0800135 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800136 copycat.start();
137
Madan Jampani9b19a822014-11-04 21:37:13 -0800138 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -0800139
140 log.info("Started.");
141 }
142
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800143 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800144 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800145 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800146 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800147 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800148 }
149
150 @Override
151 public boolean createTable(String name) {
152 return client.createTable(name);
153 }
154
155 @Override
156 public void dropTable(String name) {
157 client.dropTable(name);
158 }
159
160 @Override
161 public void dropAllTables() {
162 client.dropAllTables();
163 }
164
165 @Override
166 public List<String> listTables() {
167 return client.listTables();
168 }
169
170 @Override
171 public ReadResult read(ReadRequest request) {
172 return batchRead(Arrays.asList(request)).get(0).get();
173 }
174
175 @Override
176 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
177 List<ReadRequest> batch) {
178 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
179 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
180 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
181 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
182 new NoSuchTableException()));
183 } else {
184 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
185 internalReadResult.result()));
186 }
187 }
188 return readResults;
189 }
190
191 @Override
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800192 public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
193 return batchWrite(Arrays.asList(request)).get(0);
194 }
195
196 @Override
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800197 public WriteResult write(WriteRequest request) {
198// throws OptimisticLockException, PreconditionFailedException {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800199 return writeNothrow(request).get();
Madan Jampani08822c42014-11-04 17:17:46 -0800200 }
201
202 @Override
203 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
204 List<WriteRequest> batch) {
205 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
206 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
207 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
208 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
209 new NoSuchTableException()));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800210 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
Madan Jampani08822c42014-11-04 17:17:46 -0800211 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
212 new OptimisticLockException()));
213 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
214 // TODO: throw a different exception?
215 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800216 new OptimisticLockException()));
Madan Jampani08822c42014-11-04 17:17:46 -0800217 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
218 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
219 new WriteAborted()));
220 } else {
221 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
222 internalWriteResult.result()));
223 }
224 }
225 return writeResults;
226
227 }
228
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800229 private final class InternalClusterEventListener
230 implements ClusterEventListener {
231
232 @Override
233 public void event(ClusterEvent event) {
234 // TODO: Not every node should be part of the consensus ring.
235
236 final ControllerNode node = event.subject();
237 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
238 node.tcpPort());
239
240 log.trace("{}", event);
241 switch (event.type()) {
242 case INSTANCE_ACTIVATED:
243 case INSTANCE_ADDED:
244 log.info("{} was added to the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800245 synchronized (clusterConfig) {
246 clusterConfig.addRemoteMember(tcpMember);
247 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800248 break;
249 case INSTANCE_DEACTIVATED:
250 case INSTANCE_REMOVED:
251 log.info("{} was removed from the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800252 synchronized (clusterConfig) {
253 clusterConfig.removeRemoteMember(tcpMember);
254 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800255 break;
256 default:
257 break;
258 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800259 if (copycat != null) {
260 log.debug("Current cluster: {}", copycat.cluster());
261 }
262 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800263 }
264
265 }
266
267 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
268 public KryoRegisteredInMemoryLog() {
269 super();
270 // required to deserialize object across bundles
271 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
272 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
273 }
274 }
275
Madan Jampani08822c42014-11-04 17:17:46 -0800276 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
277
278 private final R result;
279 private final DatabaseException exception;
280
281 public DatabaseOperationResult(R result) {
282 this.result = result;
283 this.exception = null;
284 }
285
286 public DatabaseOperationResult(DatabaseException exception) {
287 this.result = null;
288 this.exception = exception;
289 }
290
291 @Override
292 public R get() {
293 if (result != null) {
294 return result;
295 }
296 throw exception;
297 }
298
299 @Override
300 public boolean hasValidResult() {
301 return result != null;
302 }
303
304 @Override
305 public String toString() {
306 if (result != null) {
307 return result.toString();
308 } else {
309 return exception.toString();
310 }
311 }
312 }
313}