blob: cdf66af054afa6bddb6ea77e4e9fb653af0a36e8 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -08005import java.io.ByteArrayInputStream;
6import java.io.ByteArrayOutputStream;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.ArrayList;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08008import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08009import java.util.List;
10import java.util.Map;
Madan Jampani12390c12014-11-12 00:35:56 -080011import java.util.Set;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080012import java.util.zip.DeflaterOutputStream;
13import java.util.zip.InflaterInputStream;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080014
Madan Jampani08822c42014-11-04 17:17:46 -080015import net.kuujo.copycat.Command;
16import net.kuujo.copycat.Query;
17import net.kuujo.copycat.StateMachine;
18
Madan Jampani12390c12014-11-12 00:35:56 -080019import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampani12390c12014-11-12 00:35:56 -080021import org.onlab.onos.store.service.BatchReadRequest;
22import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.onlab.onos.store.service.ReadRequest;
24import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080025import org.onlab.onos.store.service.ReadStatus;
Madan Jampani37c2e702014-11-04 18:11:10 -080026import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080027import org.onlab.onos.store.service.WriteRequest;
28import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080029import org.onlab.onos.store.service.WriteStatus;
Madan Jampani08822c42014-11-04 17:17:46 -080030import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080031import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080032
Madan Jampanif5d263b2014-11-13 10:04:40 -080033import com.google.common.base.MoreObjects;
Madan Jampanidef2c652014-11-12 13:50:10 -080034import com.google.common.collect.ImmutableSet;
Madan Jampani932c6ba2014-11-12 01:36:04 -080035import com.google.common.collect.Lists;
Madan Jampani08822c42014-11-04 17:17:46 -080036import com.google.common.collect.Maps;
Madan Jampanidef2c652014-11-12 13:50:10 -080037import com.google.common.collect.Sets;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080038import com.google.common.io.ByteStreams;
Madan Jampani08822c42014-11-04 17:17:46 -080039
Madan Jampani686fa182014-11-04 23:16:27 -080040/**
41 * StateMachine whose transitions are coordinated/replicated
42 * by Raft consensus.
43 * Each Raft cluster member has a instance of this state machine that is
44 * independently updated in lock step once there is consensus
45 * on the next transition.
46 */
Madan Jampani08822c42014-11-04 17:17:46 -080047public class DatabaseStateMachine implements StateMachine {
48
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080049 private final Logger log = getLogger(getClass());
50
Madan Jampani12390c12014-11-12 00:35:56 -080051 // message subject for database update notifications.
Madan Jampani44e6a542014-11-12 01:06:51 -080052 public static final MessageSubject DATABASE_UPDATE_EVENTS =
Madan Jampani12390c12014-11-12 00:35:56 -080053 new MessageSubject("database-update-events");
54
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080055 // serializer used for snapshot
Madan Jampani08822c42014-11-04 17:17:46 -080056 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
57 @Override
58 protected void setupKryoPool() {
59 serializerPool = KryoNamespace.newBuilder()
60 .register(VersionedValue.class)
61 .register(State.class)
Madan Jampanidef2c652014-11-12 13:50:10 -080062 .register(TableMetadata.class)
Madan Jampani12390c12014-11-12 00:35:56 -080063 .register(BatchReadRequest.class)
64 .register(BatchWriteRequest.class)
65 .register(ReadStatus.class)
66 .register(WriteStatus.class)
67 // TODO: Move this out ?
68 .register(TableModificationEvent.class)
Madan Jampanif5d263b2014-11-13 10:04:40 -080069 .register(TableModificationEvent.Type.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080070 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080071 .build()
72 .populate(1);
73 }
74 };
75
Madan Jampanidef2c652014-11-12 13:50:10 -080076 private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
Madan Jampani12390c12014-11-12 00:35:56 -080077
78 // durable internal state of the database.
Madan Jampani08822c42014-11-04 17:17:46 -080079 private State state = new State();
80
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080081 private boolean compressSnapshot = false;
82
Madan Jampani08822c42014-11-04 17:17:46 -080083 @Command
84 public boolean createTable(String tableName) {
Madan Jampanidef2c652014-11-12 13:50:10 -080085 TableMetadata metadata = new TableMetadata(tableName);
86 return createTable(metadata);
Madan Jampani12390c12014-11-12 00:35:56 -080087 }
88
89 @Command
Madan Jampanif5d263b2014-11-13 10:04:40 -080090 public boolean createTableWithExpiration(String tableName) {
91 int ttlMillis = 10000;
Madan Jampanidef2c652014-11-12 13:50:10 -080092 TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
93 return createTable(metadata);
94 }
95
96 private boolean createTable(TableMetadata metadata) {
97 Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
98 if (existingTable != null) {
99 return false;
Madan Jampani12390c12014-11-12 00:35:56 -0800100 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800101 state.createTable(metadata);
102 for (DatabaseUpdateEventListener listener : listeners) {
103 listener.tableCreated(metadata);
104 }
105 return true;
Madan Jampani08822c42014-11-04 17:17:46 -0800106 }
107
108 @Command
109 public boolean dropTable(String tableName) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800110 if (state.removeTable(tableName)) {
Madan Jampani12390c12014-11-12 00:35:56 -0800111 for (DatabaseUpdateEventListener listener : listeners) {
112 listener.tableDeleted(tableName);
113 }
114 return true;
115 }
116 return false;
Madan Jampani08822c42014-11-04 17:17:46 -0800117 }
118
119 @Command
120 public boolean dropAllTables() {
Madan Jampanidef2c652014-11-12 13:50:10 -0800121 Set<String> tableNames = state.getTableNames();
122 state.removeAllTables();
Madan Jampani12390c12014-11-12 00:35:56 -0800123 for (DatabaseUpdateEventListener listener : listeners) {
124 for (String tableName : tableNames) {
125 listener.tableDeleted(tableName);
126 }
127 }
Madan Jampani08822c42014-11-04 17:17:46 -0800128 return true;
129 }
130
131 @Query
Madan Jampanidef2c652014-11-12 13:50:10 -0800132 public Set<String> listTables() {
133 return ImmutableSet.copyOf(state.getTableNames());
Madan Jampani08822c42014-11-04 17:17:46 -0800134 }
135
136 @Query
Madan Jampani12390c12014-11-12 00:35:56 -0800137 public List<ReadResult> read(BatchReadRequest batchRequest) {
138 List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
139 for (ReadRequest request : batchRequest.getAsList()) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800140 Map<String, VersionedValue> table = state.getTable(request.tableName());
Madan Jampani08822c42014-11-04 17:17:46 -0800141 if (table == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800142 results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
Madan Jampani08822c42014-11-04 17:17:46 -0800143 continue;
144 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -0800145 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800146 results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
Madan Jampani08822c42014-11-04 17:17:46 -0800147 }
148 return results;
149 }
150
Madan Jampani12390c12014-11-12 00:35:56 -0800151 WriteStatus checkIfApplicable(WriteRequest request,
152 VersionedValue value) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800153
154 switch (request.type()) {
155 case PUT:
Madan Jampani12390c12014-11-12 00:35:56 -0800156 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800157
158 case PUT_IF_ABSENT:
159 if (value == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800160 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800161 }
Madan Jampani12390c12014-11-12 00:35:56 -0800162 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800163 case PUT_IF_VALUE:
164 case REMOVE_IF_VALUE:
165 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
Madan Jampani12390c12014-11-12 00:35:56 -0800166 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800167 }
Madan Jampani12390c12014-11-12 00:35:56 -0800168 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800169 case PUT_IF_VERSION:
170 case REMOVE_IF_VERSION:
171 if (value != null && request.previousVersion() == value.version()) {
Madan Jampani12390c12014-11-12 00:35:56 -0800172 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800173 }
Madan Jampani12390c12014-11-12 00:35:56 -0800174 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800175 case REMOVE:
Madan Jampani12390c12014-11-12 00:35:56 -0800176 return WriteStatus.OK;
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800177 default:
178 break;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800179 }
180 log.error("Should never reach here {}", request);
Madan Jampani12390c12014-11-12 00:35:56 -0800181 return WriteStatus.ABORTED;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800182 }
183
Madan Jampani08822c42014-11-04 17:17:46 -0800184 @Command
Madan Jampani12390c12014-11-12 00:35:56 -0800185 public List<WriteResult> write(BatchWriteRequest batchRequest) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800186
187 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -0800188 boolean abort = false;
Madan Jampani12390c12014-11-12 00:35:56 -0800189 List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
190 for (WriteRequest request : batchRequest.getAsList()) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800191 Map<String, VersionedValue> table = state.getTable(request.tableName());
Madan Jampani08822c42014-11-04 17:17:46 -0800192 if (table == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800193 validationResults.add(WriteStatus.NO_SUCH_TABLE);
Madan Jampani08822c42014-11-04 17:17:46 -0800194 abort = true;
195 continue;
196 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800197 final VersionedValue value = table.get(request.key());
Madan Jampani12390c12014-11-12 00:35:56 -0800198 WriteStatus result = checkIfApplicable(request, value);
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800199 validationResults.add(result);
Madan Jampani12390c12014-11-12 00:35:56 -0800200 if (result != WriteStatus.OK) {
Madan Jampani08822c42014-11-04 17:17:46 -0800201 abort = true;
Madan Jampani08822c42014-11-04 17:17:46 -0800202 }
Madan Jampani08822c42014-11-04 17:17:46 -0800203 }
204
Madan Jampani12390c12014-11-12 00:35:56 -0800205 List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
Madan Jampani08822c42014-11-04 17:17:46 -0800206
207 if (abort) {
Madan Jampani12390c12014-11-12 00:35:56 -0800208 for (WriteStatus validationResult : validationResults) {
209 if (validationResult == WriteStatus.OK) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800210 // aborted due to applicability check failure on other request
Madan Jampani12390c12014-11-12 00:35:56 -0800211 results.add(new WriteResult(WriteStatus.ABORTED, null));
Madan Jampani08822c42014-11-04 17:17:46 -0800212 } else {
Madan Jampani12390c12014-11-12 00:35:56 -0800213 results.add(new WriteResult(validationResult, null));
Madan Jampani08822c42014-11-04 17:17:46 -0800214 }
215 }
216 return results;
217 }
218
Madan Jampani12390c12014-11-12 00:35:56 -0800219 List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
220
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800221 // apply changes
Madan Jampani12390c12014-11-12 00:35:56 -0800222 for (WriteRequest request : batchRequest.getAsList()) {
Madan Jampanidef2c652014-11-12 13:50:10 -0800223 Map<String, VersionedValue> table = state.getTable(request.tableName());
Madan Jampani12390c12014-11-12 00:35:56 -0800224
225 TableModificationEvent tableModificationEvent = null;
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800226 // FIXME: If this method could be called by multiple thread,
227 // synchronization scope is wrong.
228 // Whole function including applicability check needs to be protected.
229 // Confirm copycat's thread safety requirement for StateMachine
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800230 // TODO: If we need isolation, we need to block reads also
Madan Jampani08822c42014-11-04 17:17:46 -0800231 synchronized (table) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800232 switch (request.type()) {
233 case PUT:
234 case PUT_IF_ABSENT:
235 case PUT_IF_VALUE:
236 case PUT_IF_VERSION:
237 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
238 VersionedValue previousValue = table.put(request.key(), newValue);
Madan Jampani12390c12014-11-12 00:35:56 -0800239 WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
240 results.add(putResult);
241 tableModificationEvent = (previousValue == null) ?
Madan Jampani9b37d572014-11-12 11:53:24 -0800242 TableModificationEvent.rowAdded(request.tableName(), request.key(), newValue) :
243 TableModificationEvent.rowUpdated(request.tableName(), request.key(), newValue);
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800244 break;
245
246 case REMOVE:
247 case REMOVE_IF_VALUE:
248 case REMOVE_IF_VERSION:
249 VersionedValue removedValue = table.remove(request.key());
Madan Jampani12390c12014-11-12 00:35:56 -0800250 WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
251 results.add(removeResult);
252 if (removedValue != null) {
253 tableModificationEvent =
Madan Jampani9b37d572014-11-12 11:53:24 -0800254 TableModificationEvent.rowDeleted(request.tableName(), request.key(), removedValue);
Madan Jampani12390c12014-11-12 00:35:56 -0800255 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800256 break;
257
258 default:
259 log.error("Invalid WriteRequest type {}", request.type());
260 break;
261 }
Madan Jampani08822c42014-11-04 17:17:46 -0800262 }
Madan Jampani12390c12014-11-12 00:35:56 -0800263
264 if (tableModificationEvent != null) {
265 tableModificationEvents.add(tableModificationEvent);
266 }
Madan Jampani08822c42014-11-04 17:17:46 -0800267 }
Madan Jampani12390c12014-11-12 00:35:56 -0800268
269 // notify listeners of table mod events.
270 for (DatabaseUpdateEventListener listener : listeners) {
271 for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800272 log.info("Publishing table modification event: {}", tableModificationEvent);
Madan Jampani12390c12014-11-12 00:35:56 -0800273 listener.tableModified(tableModificationEvent);
274 }
275 }
276
Madan Jampani08822c42014-11-04 17:17:46 -0800277 return results;
278 }
279
Madan Jampanidef2c652014-11-12 13:50:10 -0800280 public static class State {
Madan Jampani08822c42014-11-04 17:17:46 -0800281
Madan Jampanidef2c652014-11-12 13:50:10 -0800282 private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
283 private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
Madan Jampani08822c42014-11-04 17:17:46 -0800284 private long versionCounter = 1;
285
Madan Jampanidef2c652014-11-12 13:50:10 -0800286 public Map<String, VersionedValue> getTable(String tableName) {
287 return tableData.get(tableName);
288 }
289
290 void createTable(TableMetadata metadata) {
291 tableMetadata.put(metadata.tableName, metadata);
292 tableData.put(metadata.tableName, Maps.newHashMap());
293 }
294
295 TableMetadata getTableMetadata(String tableName) {
296 return tableMetadata.get(tableName);
Madan Jampani08822c42014-11-04 17:17:46 -0800297 }
298
299 long nextVersion() {
300 return versionCounter++;
301 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800302
303 Set<String> getTableNames() {
304 return ImmutableSet.copyOf(tableMetadata.keySet());
305 }
306
307
308 boolean removeTable(String tableName) {
309 if (!tableMetadata.containsKey(tableName)) {
310 return false;
311 }
312 tableMetadata.remove(tableName);
313 tableData.remove(tableName);
314 return true;
315 }
316
317 void removeAllTables() {
318 tableMetadata.clear();
319 tableData.clear();
320 }
321 }
322
323 public static class TableMetadata {
324 private final String tableName;
325 private final boolean expireOldEntries;
326 private final int ttlMillis;
327
328 public TableMetadata(String tableName) {
329 this.tableName = tableName;
330 this.expireOldEntries = false;
331 this.ttlMillis = Integer.MAX_VALUE;
332
333 }
334
335 public TableMetadata(String tableName, int ttlMillis) {
336 this.tableName = tableName;
337 this.expireOldEntries = true;
338 this.ttlMillis = ttlMillis;
339 }
340
341 public String tableName() {
342 return tableName;
343 }
344
345 public boolean expireOldEntries() {
346 return expireOldEntries;
347 }
348
349 public int ttlMillis() {
350 return ttlMillis;
351 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800352
353 @Override
354 public String toString() {
355 return MoreObjects.toStringHelper(getClass())
356 .add("tableName", tableName)
357 .add("expireOldEntries", expireOldEntries)
358 .add("ttlMillis", ttlMillis)
359 .toString();
360 }
Madan Jampani08822c42014-11-04 17:17:46 -0800361 }
362
363 @Override
364 public byte[] takeSnapshot() {
365 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800366 if (compressSnapshot) {
367 byte[] input = SERIALIZER.encode(state);
368 ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
369 DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
370 compressor.write(input, 0, input.length);
371 compressor.close();
372 return comp.toByteArray();
373 } else {
374 return SERIALIZER.encode(state);
375 }
Madan Jampani08822c42014-11-04 17:17:46 -0800376 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800377 log.error("Failed to take snapshot", e);
378 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800379 }
380 }
381
382 @Override
383 public void installSnapshot(byte[] data) {
384 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800385 if (compressSnapshot) {
386 ByteArrayInputStream in = new ByteArrayInputStream(data);
387 InflaterInputStream decompressor = new InflaterInputStream(in);
388 ByteStreams.toByteArray(decompressor);
389 this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
390 } else {
391 this.state = SERIALIZER.decode(data);
392 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800393
394 // FIXME: synchronize.
395 for (DatabaseUpdateEventListener listener : listeners) {
396 listener.snapshotInstalled(state);
397 }
Madan Jampani08822c42014-11-04 17:17:46 -0800398 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800399 log.error("Failed to install from snapshot", e);
400 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800401 }
402 }
Madan Jampani12390c12014-11-12 00:35:56 -0800403
Madan Jampanidef2c652014-11-12 13:50:10 -0800404 /**
405 * Adds specified DatabaseUpdateEventListener.
406 * @param listener listener to add
407 */
Madan Jampani12390c12014-11-12 00:35:56 -0800408 public void addEventListener(DatabaseUpdateEventListener listener) {
409 listeners.add(listener);
410 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800411
412 /**
413 * Removes specified DatabaseUpdateEventListener.
414 * @param listener listener to remove
415 */
416 public void removeEventListener(DatabaseUpdateEventListener listener) {
417 listeners.remove(listener);
418 }
Madan Jampani08822c42014-11-04 17:17:46 -0800419}