blob: 421f9cce17628fcd769a4c55e129df7e919f5ae6 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08005import java.io.File;
Madan Jampani08822c42014-11-04 17:17:46 -08006import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.List;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -08009import java.util.concurrent.CountDownLatch;
10import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080011
12import net.kuujo.copycat.Copycat;
13import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080014import net.kuujo.copycat.cluster.ClusterConfig;
Madan Jampani08822c42014-11-04 17:17:46 -080015import net.kuujo.copycat.cluster.TcpCluster;
16import net.kuujo.copycat.cluster.TcpClusterConfig;
17import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080018import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080019import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080020
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080022import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080023import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080026import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080027import org.onlab.onos.cluster.ClusterEvent;
28import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080029import org.onlab.onos.cluster.ClusterService;
30import org.onlab.onos.cluster.ControllerNode;
31import org.onlab.onos.store.service.DatabaseAdminService;
32import org.onlab.onos.store.service.DatabaseException;
33import org.onlab.onos.store.service.DatabaseService;
34import org.onlab.onos.store.service.NoSuchTableException;
35import org.onlab.onos.store.service.OptimisticLockException;
36import org.onlab.onos.store.service.OptionalResult;
37import org.onlab.onos.store.service.PreconditionFailedException;
38import org.onlab.onos.store.service.ReadRequest;
39import org.onlab.onos.store.service.ReadResult;
40import org.onlab.onos.store.service.WriteAborted;
41import org.onlab.onos.store.service.WriteRequest;
42import org.onlab.onos.store.service.WriteResult;
43import org.slf4j.Logger;
44
Madan Jampani08822c42014-11-04 17:17:46 -080045/**
46 * Strongly consistent and durable state management service based on
47 * Copycat implementation of Raft consensus protocol.
48 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080049@Component(immediate = true)
50@Service
Madan Jampani08822c42014-11-04 17:17:46 -080051public class DatabaseManager implements DatabaseService, DatabaseAdminService {
52
53 private final Logger log = getLogger(getClass());
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080056 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080057
Madan Jampani9b19a822014-11-04 21:37:13 -080058 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080059 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080060
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080061 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
Madan Jampani08822c42014-11-04 17:17:46 -080062
63 private Copycat copycat;
64 private DatabaseClient client;
65
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080066 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080067 private ClusterConfig<TcpMember> clusterConfig;
68
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080069 private CountDownLatch clusterEventLatch;
70
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080071 private ClusterEventListener clusterEventListener;
72
Madan Jampani08822c42014-11-04 17:17:46 -080073 @Activate
74 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080075
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080076 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -080077
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080078 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani08822c42014-11-04 17:17:46 -080079 TcpMember localMember =
80 new TcpMember(
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080081 localNode.ip().toString(),
82 localNode.tcpPort());
83
84 clusterConfig = new TcpClusterConfig();
85 clusterConfig.setLocalMember(localMember);
86
87 List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
88
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080089 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080090 clusterEventListener = new InternalClusterEventListener();
91 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -080092
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080093 // note: from this point beyond, clusterConfig requires synchronization
94
Madan Jampani08822c42014-11-04 17:17:46 -080095 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080096 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080097 if (!member.equals(localMember)) {
98 remoteMembers.add(member);
99 }
100 }
101
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800102 if (remoteMembers.isEmpty()) {
103 log.info("This node is the only node in the cluster. "
104 + "Waiting for others to show up.");
105 // FIXME: hack trying to relax cases forming multiple consensus rings.
106 // add seed node configuration to avoid this
Madan Jampani08822c42014-11-04 17:17:46 -0800107
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800108 // If the node is alone on it's own, wait some time
109 // hoping other will come up soon
110 try {
111 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
112 log.info("Starting as single node cluster");
113 }
114 } catch (InterruptedException e) {
115 log.info("Interrupted waiting for others", e);
116 }
117 }
Madan Jampani08822c42014-11-04 17:17:46 -0800118
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800119 final TcpCluster cluster;
120 synchronized (clusterConfig) {
121 clusterConfig.addRemoteMembers(remoteMembers);
122
123 // Create the cluster.
124 cluster = new TcpCluster(clusterConfig);
125 }
126 log.info("Starting cluster: {}", cluster);
127
Madan Jampani08822c42014-11-04 17:17:46 -0800128
129 StateMachine stateMachine = new DatabaseStateMachine();
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800130 // Chronicle + OSGi issue
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -0800131 //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800132 //Log consensusLog = new KryoRegisteredInMemoryLog();
133 Log consensusLog = new MapDBLog(new File(LOG_FILE_PREFIX + localNode.id()),
134 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800135
Madan Jampani9b19a822014-11-04 21:37:13 -0800136 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800137 copycat.start();
138
Madan Jampani9b19a822014-11-04 21:37:13 -0800139 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -0800140
141 log.info("Started.");
142 }
143
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800144 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800145 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800146 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800147 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800148 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800149 }
150
151 @Override
152 public boolean createTable(String name) {
153 return client.createTable(name);
154 }
155
156 @Override
157 public void dropTable(String name) {
158 client.dropTable(name);
159 }
160
161 @Override
162 public void dropAllTables() {
163 client.dropAllTables();
164 }
165
166 @Override
167 public List<String> listTables() {
168 return client.listTables();
169 }
170
171 @Override
172 public ReadResult read(ReadRequest request) {
173 return batchRead(Arrays.asList(request)).get(0).get();
174 }
175
176 @Override
177 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
178 List<ReadRequest> batch) {
179 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
180 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
181 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
182 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
183 new NoSuchTableException()));
184 } else {
185 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
186 internalReadResult.result()));
187 }
188 }
189 return readResults;
190 }
191
192 @Override
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800193 public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
194 return batchWrite(Arrays.asList(request)).get(0);
195 }
196
197 @Override
198 public WriteResult write(WriteRequest request)
199 throws OptimisticLockException, PreconditionFailedException {
200 return writeNothrow(request).get();
Madan Jampani08822c42014-11-04 17:17:46 -0800201 }
202
203 @Override
204 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
205 List<WriteRequest> batch) {
206 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
207 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
208 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
209 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
210 new NoSuchTableException()));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800211 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
Madan Jampani08822c42014-11-04 17:17:46 -0800212 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
213 new OptimisticLockException()));
214 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
215 // TODO: throw a different exception?
216 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800217 new OptimisticLockException()));
Madan Jampani08822c42014-11-04 17:17:46 -0800218 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
219 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
220 new WriteAborted()));
221 } else {
222 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
223 internalWriteResult.result()));
224 }
225 }
226 return writeResults;
227
228 }
229
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800230 private final class InternalClusterEventListener
231 implements ClusterEventListener {
232
233 @Override
234 public void event(ClusterEvent event) {
235 // TODO: Not every node should be part of the consensus ring.
236
237 final ControllerNode node = event.subject();
238 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
239 node.tcpPort());
240
241 log.trace("{}", event);
242 switch (event.type()) {
243 case INSTANCE_ACTIVATED:
244 case INSTANCE_ADDED:
245 log.info("{} was added to the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800246 synchronized (clusterConfig) {
247 clusterConfig.addRemoteMember(tcpMember);
248 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800249 break;
250 case INSTANCE_DEACTIVATED:
251 case INSTANCE_REMOVED:
252 log.info("{} was removed from the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800253 synchronized (clusterConfig) {
254 clusterConfig.removeRemoteMember(tcpMember);
255 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800256 break;
257 default:
258 break;
259 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800260 if (copycat != null) {
261 log.debug("Current cluster: {}", copycat.cluster());
262 }
263 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800264 }
265
266 }
267
268 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
269 public KryoRegisteredInMemoryLog() {
270 super();
271 // required to deserialize object across bundles
272 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
273 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
274 }
275 }
276
Madan Jampani08822c42014-11-04 17:17:46 -0800277 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
278
279 private final R result;
280 private final DatabaseException exception;
281
282 public DatabaseOperationResult(R result) {
283 this.result = result;
284 this.exception = null;
285 }
286
287 public DatabaseOperationResult(DatabaseException exception) {
288 this.result = null;
289 this.exception = exception;
290 }
291
292 @Override
293 public R get() {
294 if (result != null) {
295 return result;
296 }
297 throw exception;
298 }
299
300 @Override
301 public boolean hasValidResult() {
302 return result != null;
303 }
304
305 @Override
306 public String toString() {
307 if (result != null) {
308 return result.toString();
309 } else {
310 return exception.toString();
311 }
312 }
313 }
314}