blob: 3b0d8746ff1f6ca1ca75bf74c94353af93fd3406 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -08005import java.io.ByteArrayInputStream;
6import java.io.ByteArrayOutputStream;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.ArrayList;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08008import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08009import java.util.List;
10import java.util.Map;
Madan Jampani12390c12014-11-12 00:35:56 -080011import java.util.Set;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080012import java.util.zip.DeflaterOutputStream;
13import java.util.zip.InflaterInputStream;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080014
Madan Jampani08822c42014-11-04 17:17:46 -080015import net.kuujo.copycat.Command;
16import net.kuujo.copycat.Query;
17import net.kuujo.copycat.StateMachine;
18
Madan Jampani12390c12014-11-12 00:35:56 -080019import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampani12390c12014-11-12 00:35:56 -080021import org.onlab.onos.store.service.BatchReadRequest;
22import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.onlab.onos.store.service.ReadRequest;
24import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080025import org.onlab.onos.store.service.ReadStatus;
Madan Jampani37c2e702014-11-04 18:11:10 -080026import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080027import org.onlab.onos.store.service.WriteRequest;
28import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080029import org.onlab.onos.store.service.WriteStatus;
Madan Jampani08822c42014-11-04 17:17:46 -080030import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080031import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080032
Madan Jampanidef2c652014-11-12 13:50:10 -080033import com.google.common.collect.ImmutableSet;
Madan Jampani932c6ba2014-11-12 01:36:04 -080034import com.google.common.collect.Lists;
Madan Jampani08822c42014-11-04 17:17:46 -080035import com.google.common.collect.Maps;
Madan Jampanidef2c652014-11-12 13:50:10 -080036import com.google.common.collect.Sets;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080037import com.google.common.io.ByteStreams;
Madan Jampani08822c42014-11-04 17:17:46 -080038
Madan Jampani686fa182014-11-04 23:16:27 -080039/**
40 * StateMachine whose transitions are coordinated/replicated
41 * by Raft consensus.
42 * Each Raft cluster member has a instance of this state machine that is
43 * independently updated in lock step once there is consensus
44 * on the next transition.
45 */
Madan Jampani08822c42014-11-04 17:17:46 -080046public class DatabaseStateMachine implements StateMachine {
47
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080048 private final Logger log = getLogger(getClass());
49
Madan Jampani12390c12014-11-12 00:35:56 -080050 // message subject for database update notifications.
Madan Jampani44e6a542014-11-12 01:06:51 -080051 public static final MessageSubject DATABASE_UPDATE_EVENTS =
Madan Jampani12390c12014-11-12 00:35:56 -080052 new MessageSubject("database-update-events");
53
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080054 // serializer used for snapshot
Madan Jampani08822c42014-11-04 17:17:46 -080055 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
56 @Override
57 protected void setupKryoPool() {
58 serializerPool = KryoNamespace.newBuilder()
59 .register(VersionedValue.class)
60 .register(State.class)
Madan Jampanidef2c652014-11-12 13:50:10 -080061 .register(TableMetadata.class)
Madan Jampani12390c12014-11-12 00:35:56 -080062 .register(BatchReadRequest.class)
63 .register(BatchWriteRequest.class)
64 .register(ReadStatus.class)
65 .register(WriteStatus.class)
66 // TODO: Move this out ?
67 .register(TableModificationEvent.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080068 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080069 .build()
70 .populate(1);
71 }
72 };
73
Madan Jampanidef2c652014-11-12 13:50:10 -080074 private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani12390c12014-11-12 00:35:56 -080075
76 // durable internal state of the database.
Madan Jampani08822c42014-11-04 17:17:46 -080077 private State state = new State();
78
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080079 private boolean compressSnapshot = false;
80
Madan Jampani08822c42014-11-04 17:17:46 -080081 @Command
82 public boolean createTable(String tableName) {
Madan Jampanidef2c652014-11-12 13:50:10 -080083 TableMetadata metadata = new TableMetadata(tableName);
84 return createTable(metadata);
Madan Jampani12390c12014-11-12 00:35:56 -080085 }
86
87 @Command
Madan Jampanidef2c652014-11-12 13:50:10 -080088 public boolean createTable(String tableName, int ttlMillis) {
89 TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
90 return createTable(metadata);
91 }
92
93 private boolean createTable(TableMetadata metadata) {
94 Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
95 if (existingTable != null) {
96 return false;
Madan Jampani12390c12014-11-12 00:35:56 -080097 }
Madan Jampanidef2c652014-11-12 13:50:10 -080098 state.createTable(metadata);
99 for (DatabaseUpdateEventListener listener : listeners) {
100 listener.tableCreated(metadata);
101 }
102 return true;
Madan Jampani08822c42014-11-04 17:17:46 -0800103 }
104
105 @Command
106 public boolean dropTable(String tableName) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800107 if (state.removeTable(tableName)) {
Madan Jampani12390c12014-11-12 00:35:56 -0800108 for (DatabaseUpdateEventListener listener : listeners) {
109 listener.tableDeleted(tableName);
110 }
111 return true;
112 }
113 return false;
Madan Jampani08822c42014-11-04 17:17:46 -0800114 }
115
116 @Command
117 public boolean dropAllTables() {
Madan Jampanidef2c652014-11-12 13:50:10 -0800118 Set<String> tableNames = state.getTableNames();
119 state.removeAllTables();
Madan Jampani12390c12014-11-12 00:35:56 -0800120 for (DatabaseUpdateEventListener listener : listeners) {
121 for (String tableName : tableNames) {
122 listener.tableDeleted(tableName);
123 }
124 }
Madan Jampani08822c42014-11-04 17:17:46 -0800125 return true;
126 }
127
128 @Query
Madan Jampanidef2c652014-11-12 13:50:10 -0800129 public Set<String> listTables() {
130 return ImmutableSet.copyOf(state.getTableNames());
Madan Jampani08822c42014-11-04 17:17:46 -0800131 }
132
133 @Query
Madan Jampani12390c12014-11-12 00:35:56 -0800134 public List<ReadResult> read(BatchReadRequest batchRequest) {
135 List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
136 for (ReadRequest request : batchRequest.getAsList()) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800137 Map<String, VersionedValue> table = state.getTable(request.tableName());
Madan Jampani08822c42014-11-04 17:17:46 -0800138 if (table == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800139 results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
Madan Jampani08822c42014-11-04 17:17:46 -0800140 continue;
141 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -0800142 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800143 results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
Madan Jampani08822c42014-11-04 17:17:46 -0800144 }
145 return results;
146 }
147
Madan Jampani12390c12014-11-12 00:35:56 -0800148 WriteStatus checkIfApplicable(WriteRequest request,
149 VersionedValue value) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800150
151 switch (request.type()) {
152 case PUT:
Madan Jampani12390c12014-11-12 00:35:56 -0800153 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800154
155 case PUT_IF_ABSENT:
156 if (value == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800157 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800158 }
Madan Jampani12390c12014-11-12 00:35:56 -0800159 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800160 case PUT_IF_VALUE:
161 case REMOVE_IF_VALUE:
162 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
Madan Jampani12390c12014-11-12 00:35:56 -0800163 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800164 }
Madan Jampani12390c12014-11-12 00:35:56 -0800165 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800166 case PUT_IF_VERSION:
167 case REMOVE_IF_VERSION:
168 if (value != null && request.previousVersion() == value.version()) {
Madan Jampani12390c12014-11-12 00:35:56 -0800169 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800170 }
Madan Jampani12390c12014-11-12 00:35:56 -0800171 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800172 case REMOVE:
Madan Jampani12390c12014-11-12 00:35:56 -0800173 return WriteStatus.OK;
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800174 default:
175 break;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800176 }
177 log.error("Should never reach here {}", request);
Madan Jampani12390c12014-11-12 00:35:56 -0800178 return WriteStatus.ABORTED;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800179 }
180
Madan Jampani08822c42014-11-04 17:17:46 -0800181 @Command
Madan Jampani12390c12014-11-12 00:35:56 -0800182 public List<WriteResult> write(BatchWriteRequest batchRequest) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800183
184 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -0800185 boolean abort = false;
Madan Jampani12390c12014-11-12 00:35:56 -0800186 List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
187 for (WriteRequest request : batchRequest.getAsList()) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800188 Map<String, VersionedValue> table = state.getTable(request.tableName());
Madan Jampani08822c42014-11-04 17:17:46 -0800189 if (table == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800190 validationResults.add(WriteStatus.NO_SUCH_TABLE);
Madan Jampani08822c42014-11-04 17:17:46 -0800191 abort = true;
192 continue;
193 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800194 final VersionedValue value = table.get(request.key());
Madan Jampani12390c12014-11-12 00:35:56 -0800195 WriteStatus result = checkIfApplicable(request, value);
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800196 validationResults.add(result);
Madan Jampani12390c12014-11-12 00:35:56 -0800197 if (result != WriteStatus.OK) {
Madan Jampani08822c42014-11-04 17:17:46 -0800198 abort = true;
Madan Jampani08822c42014-11-04 17:17:46 -0800199 }
Madan Jampani08822c42014-11-04 17:17:46 -0800200 }
201
Madan Jampani12390c12014-11-12 00:35:56 -0800202 List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
Madan Jampani08822c42014-11-04 17:17:46 -0800203
204 if (abort) {
Madan Jampani12390c12014-11-12 00:35:56 -0800205 for (WriteStatus validationResult : validationResults) {
206 if (validationResult == WriteStatus.OK) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800207 // aborted due to applicability check failure on other request
Madan Jampani12390c12014-11-12 00:35:56 -0800208 results.add(new WriteResult(WriteStatus.ABORTED, null));
Madan Jampani08822c42014-11-04 17:17:46 -0800209 } else {
Madan Jampani12390c12014-11-12 00:35:56 -0800210 results.add(new WriteResult(validationResult, null));
Madan Jampani08822c42014-11-04 17:17:46 -0800211 }
212 }
213 return results;
214 }
215
Madan Jampani12390c12014-11-12 00:35:56 -0800216 List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
217
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800218 // apply changes
Madan Jampani12390c12014-11-12 00:35:56 -0800219 for (WriteRequest request : batchRequest.getAsList()) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800220 Map<String, VersionedValue> table = state.getTable(request.tableName());
Madan Jampani12390c12014-11-12 00:35:56 -0800221
222 TableModificationEvent tableModificationEvent = null;
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800223 // FIXME: If this method could be called by multiple thread,
224 // synchronization scope is wrong.
225 // Whole function including applicability check needs to be protected.
226 // Confirm copycat's thread safety requirement for StateMachine
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800227 // TODO: If we need isolation, we need to block reads also
Madan Jampani08822c42014-11-04 17:17:46 -0800228 synchronized (table) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800229 switch (request.type()) {
230 case PUT:
231 case PUT_IF_ABSENT:
232 case PUT_IF_VALUE:
233 case PUT_IF_VERSION:
234 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
235 VersionedValue previousValue = table.put(request.key(), newValue);
Madan Jampani12390c12014-11-12 00:35:56 -0800236 WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
237 results.add(putResult);
238 tableModificationEvent = (previousValue == null) ?
Madan Jampani9b37d572014-11-12 11:53:24 -0800239 TableModificationEvent.rowAdded(request.tableName(), request.key(), newValue) :
240 TableModificationEvent.rowUpdated(request.tableName(), request.key(), newValue);
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800241 break;
242
243 case REMOVE:
244 case REMOVE_IF_VALUE:
245 case REMOVE_IF_VERSION:
246 VersionedValue removedValue = table.remove(request.key());
Madan Jampani12390c12014-11-12 00:35:56 -0800247 WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
248 results.add(removeResult);
249 if (removedValue != null) {
250 tableModificationEvent =
Madan Jampani9b37d572014-11-12 11:53:24 -0800251 TableModificationEvent.rowDeleted(request.tableName(), request.key(), removedValue);
Madan Jampani12390c12014-11-12 00:35:56 -0800252 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800253 break;
254
255 default:
256 log.error("Invalid WriteRequest type {}", request.type());
257 break;
258 }
Madan Jampani08822c42014-11-04 17:17:46 -0800259 }
Madan Jampani12390c12014-11-12 00:35:56 -0800260
261 if (tableModificationEvent != null) {
262 tableModificationEvents.add(tableModificationEvent);
263 }
Madan Jampani08822c42014-11-04 17:17:46 -0800264 }
Madan Jampani12390c12014-11-12 00:35:56 -0800265
266 // notify listeners of table mod events.
267 for (DatabaseUpdateEventListener listener : listeners) {
268 for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
269 listener.tableModified(tableModificationEvent);
270 }
271 }
272
Madan Jampani08822c42014-11-04 17:17:46 -0800273 return results;
274 }
275
Madan Jampanidef2c652014-11-12 13:50:10 -0800276 public static class State {
Madan Jampani08822c42014-11-04 17:17:46 -0800277
Madan Jampanidef2c652014-11-12 13:50:10 -0800278 private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
279 private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
Madan Jampani08822c42014-11-04 17:17:46 -0800280 private long versionCounter = 1;
281
Madan Jampanidef2c652014-11-12 13:50:10 -0800282 public Map<String, VersionedValue> getTable(String tableName) {
283 return tableData.get(tableName);
284 }
285
286 void createTable(TableMetadata metadata) {
287 tableMetadata.put(metadata.tableName, metadata);
288 tableData.put(metadata.tableName, Maps.newHashMap());
289 }
290
291 TableMetadata getTableMetadata(String tableName) {
292 return tableMetadata.get(tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800293 }
294
295 long nextVersion() {
296 return versionCounter++;
297 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800298
299 Set<String> getTableNames() {
300 return ImmutableSet.copyOf(tableMetadata.keySet());
301 }
302
303
304 boolean removeTable(String tableName) {
305 if (!tableMetadata.containsKey(tableName)) {
306 return false;
307 }
308 tableMetadata.remove(tableName);
309 tableData.remove(tableName);
310 return true;
311 }
312
313 void removeAllTables() {
314 tableMetadata.clear();
315 tableData.clear();
316 }
317 }
318
319 public static class TableMetadata {
320 private final String tableName;
321 private final boolean expireOldEntries;
322 private final int ttlMillis;
323
324 public TableMetadata(String tableName) {
325 this.tableName = tableName;
326 this.expireOldEntries = false;
327 this.ttlMillis = Integer.MAX_VALUE;
328
329 }
330
331 public TableMetadata(String tableName, int ttlMillis) {
332 this.tableName = tableName;
333 this.expireOldEntries = true;
334 this.ttlMillis = ttlMillis;
335 }
336
337 public String tableName() {
338 return tableName;
339 }
340
341 public boolean expireOldEntries() {
342 return expireOldEntries;
343 }
344
345 public int ttlMillis() {
346 return ttlMillis;
347 }
Madan Jampani08822c42014-11-04 17:17:46 -0800348 }
349
350 @Override
351 public byte[] takeSnapshot() {
352 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800353 if (compressSnapshot) {
354 byte[] input = SERIALIZER.encode(state);
355 ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
356 DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
357 compressor.write(input, 0, input.length);
358 compressor.close();
359 return comp.toByteArray();
360 } else {
361 return SERIALIZER.encode(state);
362 }
Madan Jampani08822c42014-11-04 17:17:46 -0800363 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800364 log.error("Failed to take snapshot", e);
365 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800366 }
367 }
368
369 @Override
370 public void installSnapshot(byte[] data) {
371 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800372 if (compressSnapshot) {
373 ByteArrayInputStream in = new ByteArrayInputStream(data);
374 InflaterInputStream decompressor = new InflaterInputStream(in);
375 ByteStreams.toByteArray(decompressor);
376 this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
377 } else {
378 this.state = SERIALIZER.decode(data);
379 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800380
381 // FIXME: synchronize.
382 for (DatabaseUpdateEventListener listener : listeners) {
383 listener.snapshotInstalled(state);
384 }
Madan Jampani08822c42014-11-04 17:17:46 -0800385 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800386 log.error("Failed to install from snapshot", e);
387 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800388 }
389 }
Madan Jampani12390c12014-11-12 00:35:56 -0800390
Madan Jampanidef2c652014-11-12 13:50:10 -0800391 /**
392 * Adds specified DatabaseUpdateEventListener.
393 * @param listener listener to add
394 */
Madan Jampani12390c12014-11-12 00:35:56 -0800395 public void addEventListener(DatabaseUpdateEventListener listener) {
396 listeners.add(listener);
397 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800398
399 /**
400 * Removes specified DatabaseUpdateEventListener.
401 * @param listener listener to remove
402 */
403 public void removeEventListener(DatabaseUpdateEventListener listener) {
404 listeners.remove(listener);
405 }
Madan Jampani08822c42014-11-04 17:17:46 -0800406}