blob: 6a84e045350736ac58fb21d8f241cf006922cae9 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -08008import java.util.concurrent.CountDownLatch;
9import java.util.concurrent.TimeUnit;
Madan Jampani08822c42014-11-04 17:17:46 -080010
11import net.kuujo.copycat.Copycat;
12import net.kuujo.copycat.StateMachine;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080013import net.kuujo.copycat.cluster.ClusterConfig;
Madan Jampani08822c42014-11-04 17:17:46 -080014import net.kuujo.copycat.cluster.TcpCluster;
15import net.kuujo.copycat.cluster.TcpClusterConfig;
16import net.kuujo.copycat.cluster.TcpMember;
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080017import net.kuujo.copycat.log.InMemoryLog;
Madan Jampani08822c42014-11-04 17:17:46 -080018import net.kuujo.copycat.log.Log;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080019
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.apache.felix.scr.annotations.Activate;
Madan Jampanidfbfa182014-11-04 22:06:41 -080021import org.apache.felix.scr.annotations.Component;
Yuta HIGUCHI657626e2014-11-04 20:54:58 -080022import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampanidfbfa182014-11-04 22:06:41 -080025import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080026import org.onlab.onos.cluster.ClusterEvent;
27import org.onlab.onos.cluster.ClusterEventListener;
Madan Jampani08822c42014-11-04 17:17:46 -080028import org.onlab.onos.cluster.ClusterService;
29import org.onlab.onos.cluster.ControllerNode;
30import org.onlab.onos.store.service.DatabaseAdminService;
31import org.onlab.onos.store.service.DatabaseException;
32import org.onlab.onos.store.service.DatabaseService;
33import org.onlab.onos.store.service.NoSuchTableException;
34import org.onlab.onos.store.service.OptimisticLockException;
35import org.onlab.onos.store.service.OptionalResult;
Madan Jampani08822c42014-11-04 17:17:46 -080036import org.onlab.onos.store.service.ReadRequest;
37import org.onlab.onos.store.service.ReadResult;
38import org.onlab.onos.store.service.WriteAborted;
39import org.onlab.onos.store.service.WriteRequest;
40import org.onlab.onos.store.service.WriteResult;
41import org.slf4j.Logger;
42
Madan Jampani08822c42014-11-04 17:17:46 -080043/**
44 * Strongly consistent and durable state management service based on
45 * Copycat implementation of Raft consensus protocol.
46 */
Madan Jampanidfbfa182014-11-04 22:06:41 -080047@Component(immediate = true)
48@Service
Madan Jampani08822c42014-11-04 17:17:46 -080049public class DatabaseManager implements DatabaseService, DatabaseAdminService {
50
51 private final Logger log = getLogger(getClass());
52
53 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI5001ba92014-11-04 21:33:54 -080054 protected ClusterService clusterService;
Madan Jampani08822c42014-11-04 17:17:46 -080055
Madan Jampani9b19a822014-11-04 21:37:13 -080056 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI0c1c1002014-11-05 13:47:25 -080057 protected DatabaseProtocolService copycatMessagingProtocol;
Madan Jampani9b19a822014-11-04 21:37:13 -080058
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080059 public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
Madan Jampani08822c42014-11-04 17:17:46 -080060
61 private Copycat copycat;
62 private DatabaseClient client;
63
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080064 // guarded by synchronized block
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080065 private ClusterConfig<TcpMember> clusterConfig;
66
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080067 private CountDownLatch clusterEventLatch;
68
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080069 private ClusterEventListener clusterEventListener;
70
Madan Jampani08822c42014-11-04 17:17:46 -080071 @Activate
72 public void activate() {
Madan Jampanidfbfa182014-11-04 22:06:41 -080073
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080074 // TODO: Not every node should be part of the consensus ring.
Madan Jampanidfbfa182014-11-04 22:06:41 -080075
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080076 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani08822c42014-11-04 17:17:46 -080077 TcpMember localMember =
78 new TcpMember(
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080079 localNode.ip().toString(),
80 localNode.tcpPort());
81
82 clusterConfig = new TcpClusterConfig();
83 clusterConfig.setLocalMember(localMember);
84
85 List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
86
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080087 clusterEventLatch = new CountDownLatch(1);
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -080088 clusterEventListener = new InternalClusterEventListener();
89 clusterService.addListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -080090
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -080091 // note: from this point beyond, clusterConfig requires synchronization
92
Madan Jampani08822c42014-11-04 17:17:46 -080093 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani9b19a822014-11-04 21:37:13 -080094 TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
Madan Jampani08822c42014-11-04 17:17:46 -080095 if (!member.equals(localMember)) {
96 remoteMembers.add(member);
97 }
98 }
99
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800100 if (remoteMembers.isEmpty()) {
101 log.info("This node is the only node in the cluster. "
102 + "Waiting for others to show up.");
103 // FIXME: hack trying to relax cases forming multiple consensus rings.
104 // add seed node configuration to avoid this
Madan Jampani08822c42014-11-04 17:17:46 -0800105
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800106 // If the node is alone on it's own, wait some time
107 // hoping other will come up soon
108 try {
109 if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
110 log.info("Starting as single node cluster");
111 }
112 } catch (InterruptedException e) {
113 log.info("Interrupted waiting for others", e);
114 }
115 }
Madan Jampani08822c42014-11-04 17:17:46 -0800116
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800117 final TcpCluster cluster;
118 synchronized (clusterConfig) {
119 clusterConfig.addRemoteMembers(remoteMembers);
120
121 // Create the cluster.
122 cluster = new TcpCluster(clusterConfig);
123 }
124 log.info("Starting cluster: {}", cluster);
125
Madan Jampani08822c42014-11-04 17:17:46 -0800126
127 StateMachine stateMachine = new DatabaseStateMachine();
Madan Jampani2ee20002014-11-06 20:06:12 -0800128 Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800129 ClusterMessagingProtocol.SERIALIZER);
Madan Jampani08822c42014-11-04 17:17:46 -0800130
Madan Jampani9b19a822014-11-04 21:37:13 -0800131 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
Madan Jampani08822c42014-11-04 17:17:46 -0800132 copycat.start();
133
Madan Jampani9b19a822014-11-04 21:37:13 -0800134 client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
Madan Jampani08822c42014-11-04 17:17:46 -0800135
136 log.info("Started.");
137 }
138
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800139 @Deactivate
Madan Jampani08822c42014-11-04 17:17:46 -0800140 public void deactivate() {
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800141 clusterService.removeListener(clusterEventListener);
Madan Jampani08822c42014-11-04 17:17:46 -0800142 copycat.stop();
Yuta HIGUCHI657626e2014-11-04 20:54:58 -0800143 log.info("Stopped.");
Madan Jampani08822c42014-11-04 17:17:46 -0800144 }
145
146 @Override
147 public boolean createTable(String name) {
148 return client.createTable(name);
149 }
150
151 @Override
152 public void dropTable(String name) {
153 client.dropTable(name);
154 }
155
156 @Override
157 public void dropAllTables() {
158 client.dropAllTables();
159 }
160
161 @Override
162 public List<String> listTables() {
163 return client.listTables();
164 }
165
166 @Override
167 public ReadResult read(ReadRequest request) {
168 return batchRead(Arrays.asList(request)).get(0).get();
169 }
170
171 @Override
172 public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
173 List<ReadRequest> batch) {
174 List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
175 for (InternalReadResult internalReadResult : client.batchRead(batch)) {
176 if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
177 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
178 new NoSuchTableException()));
179 } else {
180 readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
181 internalReadResult.result()));
182 }
183 }
184 return readResults;
185 }
186
187 @Override
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800188 public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
189 return batchWrite(Arrays.asList(request)).get(0);
190 }
191
192 @Override
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800193 public WriteResult write(WriteRequest request) {
194// throws OptimisticLockException, PreconditionFailedException {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800195 return writeNothrow(request).get();
Madan Jampani08822c42014-11-04 17:17:46 -0800196 }
197
198 @Override
199 public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
200 List<WriteRequest> batch) {
201 List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
202 for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
203 if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
204 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
205 new NoSuchTableException()));
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800206 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
Madan Jampani08822c42014-11-04 17:17:46 -0800207 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
208 new OptimisticLockException()));
209 } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
210 // TODO: throw a different exception?
211 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800212 new OptimisticLockException()));
Madan Jampani08822c42014-11-04 17:17:46 -0800213 } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
214 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
215 new WriteAborted()));
216 } else {
217 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
218 internalWriteResult.result()));
219 }
220 }
221 return writeResults;
222
223 }
224
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800225 private final class InternalClusterEventListener
226 implements ClusterEventListener {
227
228 @Override
229 public void event(ClusterEvent event) {
230 // TODO: Not every node should be part of the consensus ring.
231
232 final ControllerNode node = event.subject();
233 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
234 node.tcpPort());
235
236 log.trace("{}", event);
237 switch (event.type()) {
238 case INSTANCE_ACTIVATED:
239 case INSTANCE_ADDED:
240 log.info("{} was added to the cluster", tcpMember);
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800241 synchronized (clusterConfig) {
242 clusterConfig.addRemoteMember(tcpMember);
243 }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800244 break;
245 case INSTANCE_DEACTIVATED:
246 case INSTANCE_REMOVED:
Yuta HIGUCHIc53411e2014-11-10 18:32:57 -0800247 // FIXME to be replaced with admin interface
248// log.info("{} was removed from the cluster", tcpMember);
249// synchronized (clusterConfig) {
250// clusterConfig.removeRemoteMember(tcpMember);
251// }
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800252 break;
253 default:
254 break;
255 }
Yuta HIGUCHI79a1e5e2014-11-05 17:42:01 -0800256 if (copycat != null) {
257 log.debug("Current cluster: {}", copycat.cluster());
258 }
259 clusterEventLatch.countDown();
Yuta HIGUCHI5027b6b2014-11-05 16:23:26 -0800260 }
261
262 }
263
264 public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
265 public KryoRegisteredInMemoryLog() {
266 super();
267 // required to deserialize object across bundles
268 super.kryo.register(TcpMember.class, new TcpMemberSerializer());
269 super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
270 }
271 }
272
Madan Jampani08822c42014-11-04 17:17:46 -0800273 private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
274
275 private final R result;
276 private final DatabaseException exception;
277
278 public DatabaseOperationResult(R result) {
279 this.result = result;
280 this.exception = null;
281 }
282
283 public DatabaseOperationResult(DatabaseException exception) {
284 this.result = null;
285 this.exception = exception;
286 }
287
288 @Override
289 public R get() {
290 if (result != null) {
291 return result;
292 }
293 throw exception;
294 }
295
296 @Override
297 public boolean hasValidResult() {
298 return result != null;
299 }
300
301 @Override
302 public String toString() {
303 if (result != null) {
304 return result.toString();
305 } else {
306 return exception.toString();
307 }
308 }
309 }
310}