blob: 17b174c224aa8bf3a95e1106d44f65db04fccda8 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -08005import java.io.ByteArrayInputStream;
6import java.io.ByteArrayOutputStream;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.ArrayList;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08008import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08009import java.util.List;
10import java.util.Map;
Madan Jampani12390c12014-11-12 00:35:56 -080011import java.util.Set;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080012import java.util.zip.DeflaterOutputStream;
13import java.util.zip.InflaterInputStream;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080014
Madan Jampani08822c42014-11-04 17:17:46 -080015import net.kuujo.copycat.Command;
16import net.kuujo.copycat.Query;
17import net.kuujo.copycat.StateMachine;
18
Madan Jampani12390c12014-11-12 00:35:56 -080019import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani08822c42014-11-04 17:17:46 -080020import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampani12390c12014-11-12 00:35:56 -080021import org.onlab.onos.store.service.BatchReadRequest;
22import org.onlab.onos.store.service.BatchWriteRequest;
Madan Jampani08822c42014-11-04 17:17:46 -080023import org.onlab.onos.store.service.ReadRequest;
24import org.onlab.onos.store.service.ReadResult;
Madan Jampani12390c12014-11-12 00:35:56 -080025import org.onlab.onos.store.service.ReadStatus;
Madan Jampani37c2e702014-11-04 18:11:10 -080026import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080027import org.onlab.onos.store.service.WriteRequest;
28import org.onlab.onos.store.service.WriteResult;
Madan Jampani12390c12014-11-12 00:35:56 -080029import org.onlab.onos.store.service.WriteStatus;
Madan Jampani08822c42014-11-04 17:17:46 -080030import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080031import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080032
Madan Jampani12390c12014-11-12 00:35:56 -080033import com.beust.jcommander.internal.Lists;
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080034import com.google.common.collect.ImmutableList;
Madan Jampani08822c42014-11-04 17:17:46 -080035import com.google.common.collect.Maps;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080036import com.google.common.io.ByteStreams;
Madan Jampani08822c42014-11-04 17:17:46 -080037
Madan Jampani686fa182014-11-04 23:16:27 -080038/**
39 * StateMachine whose transitions are coordinated/replicated
40 * by Raft consensus.
41 * Each Raft cluster member has a instance of this state machine that is
42 * independently updated in lock step once there is consensus
43 * on the next transition.
44 */
Madan Jampani08822c42014-11-04 17:17:46 -080045public class DatabaseStateMachine implements StateMachine {
46
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080047 private final Logger log = getLogger(getClass());
48
Madan Jampani12390c12014-11-12 00:35:56 -080049 // message subject for database update notifications.
50 public static MessageSubject DATABASE_UPDATE_EVENTS =
51 new MessageSubject("database-update-events");
52
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080053 // serializer used for snapshot
Madan Jampani08822c42014-11-04 17:17:46 -080054 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
55 @Override
56 protected void setupKryoPool() {
57 serializerPool = KryoNamespace.newBuilder()
58 .register(VersionedValue.class)
59 .register(State.class)
Madan Jampani12390c12014-11-12 00:35:56 -080060 .register(BatchReadRequest.class)
61 .register(BatchWriteRequest.class)
62 .register(ReadStatus.class)
63 .register(WriteStatus.class)
64 // TODO: Move this out ?
65 .register(TableModificationEvent.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080066 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080067 .build()
68 .populate(1);
69 }
70 };
71
Madan Jampani12390c12014-11-12 00:35:56 -080072 private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
73
74 // durable internal state of the database.
Madan Jampani08822c42014-11-04 17:17:46 -080075 private State state = new State();
76
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080077 private boolean compressSnapshot = false;
78
Madan Jampani08822c42014-11-04 17:17:46 -080079 @Command
80 public boolean createTable(String tableName) {
Madan Jampani12390c12014-11-12 00:35:56 -080081 Map<String, VersionedValue> existingTable =
82 state.getTables().putIfAbsent(tableName, Maps.newHashMap());
83 if (existingTable == null) {
84 for (DatabaseUpdateEventListener listener : listeners) {
85 listener.tableCreated(tableName, Integer.MAX_VALUE);
86 }
87 return true;
88 }
89 return false;
90 }
91
92 @Command
93 public boolean createTable(String tableName, int expirationTimeMillis) {
94 Map<String, VersionedValue> existingTable =
95 state.getTables().putIfAbsent(tableName, Maps.newHashMap());
96 if (existingTable == null) {
97 for (DatabaseUpdateEventListener listener : listeners) {
98 listener.tableCreated(tableName, expirationTimeMillis);
99 }
100 return true;
101 }
102 return false;
Madan Jampani08822c42014-11-04 17:17:46 -0800103 }
104
105 @Command
106 public boolean dropTable(String tableName) {
Madan Jampani12390c12014-11-12 00:35:56 -0800107 Map<String, VersionedValue> table = state.getTables().remove(tableName);
108 if (table != null) {
109 for (DatabaseUpdateEventListener listener : listeners) {
110 listener.tableDeleted(tableName);
111 }
112 return true;
113 }
114 return false;
Madan Jampani08822c42014-11-04 17:17:46 -0800115 }
116
117 @Command
118 public boolean dropAllTables() {
Madan Jampani12390c12014-11-12 00:35:56 -0800119 Set<String> tableNames = state.getTables().keySet();
Madan Jampani08822c42014-11-04 17:17:46 -0800120 state.getTables().clear();
Madan Jampani12390c12014-11-12 00:35:56 -0800121 for (DatabaseUpdateEventListener listener : listeners) {
122 for (String tableName : tableNames) {
123 listener.tableDeleted(tableName);
124 }
125 }
Madan Jampani08822c42014-11-04 17:17:46 -0800126 return true;
127 }
128
129 @Query
Yuta HIGUCHI1838f882014-11-05 18:42:00 -0800130 public List<String> listTables() {
131 return ImmutableList.copyOf(state.getTables().keySet());
Madan Jampani08822c42014-11-04 17:17:46 -0800132 }
133
134 @Query
Madan Jampani12390c12014-11-12 00:35:56 -0800135 public List<ReadResult> read(BatchReadRequest batchRequest) {
136 List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
137 for (ReadRequest request : batchRequest.getAsList()) {
Madan Jampani08822c42014-11-04 17:17:46 -0800138 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
139 if (table == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800140 results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
Madan Jampani08822c42014-11-04 17:17:46 -0800141 continue;
142 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -0800143 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800144 results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
Madan Jampani08822c42014-11-04 17:17:46 -0800145 }
146 return results;
147 }
148
Madan Jampani12390c12014-11-12 00:35:56 -0800149 WriteStatus checkIfApplicable(WriteRequest request,
150 VersionedValue value) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800151
152 switch (request.type()) {
153 case PUT:
Madan Jampani12390c12014-11-12 00:35:56 -0800154 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800155
156 case PUT_IF_ABSENT:
157 if (value == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800158 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800159 }
Madan Jampani12390c12014-11-12 00:35:56 -0800160 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800161 case PUT_IF_VALUE:
162 case REMOVE_IF_VALUE:
163 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
Madan Jampani12390c12014-11-12 00:35:56 -0800164 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800165 }
Madan Jampani12390c12014-11-12 00:35:56 -0800166 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800167 case PUT_IF_VERSION:
168 case REMOVE_IF_VERSION:
169 if (value != null && request.previousVersion() == value.version()) {
Madan Jampani12390c12014-11-12 00:35:56 -0800170 return WriteStatus.OK;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800171 }
Madan Jampani12390c12014-11-12 00:35:56 -0800172 return WriteStatus.PRECONDITION_VIOLATION;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800173 case REMOVE:
Madan Jampani12390c12014-11-12 00:35:56 -0800174 return WriteStatus.OK;
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800175 default:
176 break;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800177 }
178 log.error("Should never reach here {}", request);
Madan Jampani12390c12014-11-12 00:35:56 -0800179 return WriteStatus.ABORTED;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800180 }
181
Madan Jampani08822c42014-11-04 17:17:46 -0800182 @Command
Madan Jampani12390c12014-11-12 00:35:56 -0800183 public List<WriteResult> write(BatchWriteRequest batchRequest) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800184
185 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -0800186 boolean abort = false;
Madan Jampani12390c12014-11-12 00:35:56 -0800187 List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
188 for (WriteRequest request : batchRequest.getAsList()) {
Madan Jampani08822c42014-11-04 17:17:46 -0800189 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
190 if (table == null) {
Madan Jampani12390c12014-11-12 00:35:56 -0800191 validationResults.add(WriteStatus.NO_SUCH_TABLE);
Madan Jampani08822c42014-11-04 17:17:46 -0800192 abort = true;
193 continue;
194 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800195 final VersionedValue value = table.get(request.key());
Madan Jampani12390c12014-11-12 00:35:56 -0800196 WriteStatus result = checkIfApplicable(request, value);
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800197 validationResults.add(result);
Madan Jampani12390c12014-11-12 00:35:56 -0800198 if (result != WriteStatus.OK) {
Madan Jampani08822c42014-11-04 17:17:46 -0800199 abort = true;
Madan Jampani08822c42014-11-04 17:17:46 -0800200 }
Madan Jampani08822c42014-11-04 17:17:46 -0800201 }
202
Madan Jampani12390c12014-11-12 00:35:56 -0800203 List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
Madan Jampani08822c42014-11-04 17:17:46 -0800204
205 if (abort) {
Madan Jampani12390c12014-11-12 00:35:56 -0800206 for (WriteStatus validationResult : validationResults) {
207 if (validationResult == WriteStatus.OK) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800208 // aborted due to applicability check failure on other request
Madan Jampani12390c12014-11-12 00:35:56 -0800209 results.add(new WriteResult(WriteStatus.ABORTED, null));
Madan Jampani08822c42014-11-04 17:17:46 -0800210 } else {
Madan Jampani12390c12014-11-12 00:35:56 -0800211 results.add(new WriteResult(validationResult, null));
Madan Jampani08822c42014-11-04 17:17:46 -0800212 }
213 }
214 return results;
215 }
216
Madan Jampani12390c12014-11-12 00:35:56 -0800217 List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
218
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800219 // apply changes
Madan Jampani12390c12014-11-12 00:35:56 -0800220 for (WriteRequest request : batchRequest.getAsList()) {
Madan Jampani08822c42014-11-04 17:17:46 -0800221 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Madan Jampani12390c12014-11-12 00:35:56 -0800222
223 TableModificationEvent tableModificationEvent = null;
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800224 // FIXME: If this method could be called by multiple thread,
225 // synchronization scope is wrong.
226 // Whole function including applicability check needs to be protected.
227 // Confirm copycat's thread safety requirement for StateMachine
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800228 // TODO: If we need isolation, we need to block reads also
Madan Jampani08822c42014-11-04 17:17:46 -0800229 synchronized (table) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800230 switch (request.type()) {
231 case PUT:
232 case PUT_IF_ABSENT:
233 case PUT_IF_VALUE:
234 case PUT_IF_VERSION:
235 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
236 VersionedValue previousValue = table.put(request.key(), newValue);
Madan Jampani12390c12014-11-12 00:35:56 -0800237 WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
238 results.add(putResult);
239 tableModificationEvent = (previousValue == null) ?
240 TableModificationEvent.rowAdded(request.tableName(), request.key()) :
241 TableModificationEvent.rowUpdated(request.tableName(), request.key());
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800242 break;
243
244 case REMOVE:
245 case REMOVE_IF_VALUE:
246 case REMOVE_IF_VERSION:
247 VersionedValue removedValue = table.remove(request.key());
Madan Jampani12390c12014-11-12 00:35:56 -0800248 WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
249 results.add(removeResult);
250 if (removedValue != null) {
251 tableModificationEvent =
252 TableModificationEvent.rowDeleted(request.tableName(), request.key());
253 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800254 break;
255
256 default:
257 log.error("Invalid WriteRequest type {}", request.type());
258 break;
259 }
Madan Jampani08822c42014-11-04 17:17:46 -0800260 }
Madan Jampani12390c12014-11-12 00:35:56 -0800261
262 if (tableModificationEvent != null) {
263 tableModificationEvents.add(tableModificationEvent);
264 }
Madan Jampani08822c42014-11-04 17:17:46 -0800265 }
Madan Jampani12390c12014-11-12 00:35:56 -0800266
267 // notify listeners of table mod events.
268 for (DatabaseUpdateEventListener listener : listeners) {
269 for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
270 listener.tableModified(tableModificationEvent);
271 }
272 }
273
Madan Jampani08822c42014-11-04 17:17:46 -0800274 return results;
275 }
276
277 public class State {
278
279 private final Map<String, Map<String, VersionedValue>> tables =
280 Maps.newHashMap();
281 private long versionCounter = 1;
282
283 Map<String, Map<String, VersionedValue>> getTables() {
284 return tables;
285 }
286
287 long nextVersion() {
288 return versionCounter++;
289 }
290 }
291
292 @Override
293 public byte[] takeSnapshot() {
294 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800295 if (compressSnapshot) {
296 byte[] input = SERIALIZER.encode(state);
297 ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
298 DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
299 compressor.write(input, 0, input.length);
300 compressor.close();
301 return comp.toByteArray();
302 } else {
303 return SERIALIZER.encode(state);
304 }
Madan Jampani08822c42014-11-04 17:17:46 -0800305 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800306 log.error("Failed to take snapshot", e);
307 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800308 }
309 }
310
311 @Override
312 public void installSnapshot(byte[] data) {
313 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800314 if (compressSnapshot) {
315 ByteArrayInputStream in = new ByteArrayInputStream(data);
316 InflaterInputStream decompressor = new InflaterInputStream(in);
317 ByteStreams.toByteArray(decompressor);
318 this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
319 } else {
320 this.state = SERIALIZER.decode(data);
321 }
Madan Jampani08822c42014-11-04 17:17:46 -0800322 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800323 log.error("Failed to install from snapshot", e);
324 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800325 }
326 }
Madan Jampani12390c12014-11-12 00:35:56 -0800327
328 public void addEventListener(DatabaseUpdateEventListener listener) {
329 listeners.add(listener);
330 }
Madan Jampani08822c42014-11-04 17:17:46 -0800331}