blob: 50d12030345d948438383bee7cf7fdb231cfd34b [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -08005import java.io.ByteArrayInputStream;
6import java.io.ByteArrayOutputStream;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.ArrayList;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08008import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08009import java.util.List;
10import java.util.Map;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080011import java.util.zip.DeflaterOutputStream;
12import java.util.zip.InflaterInputStream;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080013
Madan Jampani08822c42014-11-04 17:17:46 -080014import net.kuujo.copycat.Command;
15import net.kuujo.copycat.Query;
16import net.kuujo.copycat.StateMachine;
17
18import org.onlab.onos.store.serializers.KryoSerializer;
19import org.onlab.onos.store.service.ReadRequest;
20import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080021import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080022import org.onlab.onos.store.service.WriteRequest;
23import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080024import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
Madan Jampani08822c42014-11-04 17:17:46 -080025import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080026import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080027
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080028import com.google.common.collect.ImmutableList;
Madan Jampani08822c42014-11-04 17:17:46 -080029import com.google.common.collect.Maps;
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080030import com.google.common.io.ByteStreams;
Madan Jampani08822c42014-11-04 17:17:46 -080031
Madan Jampani686fa182014-11-04 23:16:27 -080032/**
33 * StateMachine whose transitions are coordinated/replicated
34 * by Raft consensus.
35 * Each Raft cluster member has a instance of this state machine that is
36 * independently updated in lock step once there is consensus
37 * on the next transition.
38 */
Madan Jampani08822c42014-11-04 17:17:46 -080039public class DatabaseStateMachine implements StateMachine {
40
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080041 private final Logger log = getLogger(getClass());
42
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080043 // serializer used for snapshot
Madan Jampani08822c42014-11-04 17:17:46 -080044 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
45 @Override
46 protected void setupKryoPool() {
47 serializerPool = KryoNamespace.newBuilder()
48 .register(VersionedValue.class)
49 .register(State.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080050 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080051 .build()
52 .populate(1);
53 }
54 };
55
56 private State state = new State();
57
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -080058 private boolean compressSnapshot = false;
59
Madan Jampani08822c42014-11-04 17:17:46 -080060 @Command
61 public boolean createTable(String tableName) {
62 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
63 }
64
65 @Command
66 public boolean dropTable(String tableName) {
67 return state.getTables().remove(tableName) != null;
68 }
69
70 @Command
71 public boolean dropAllTables() {
72 state.getTables().clear();
73 return true;
74 }
75
76 @Query
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080077 public List<String> listTables() {
78 return ImmutableList.copyOf(state.getTables().keySet());
Madan Jampani08822c42014-11-04 17:17:46 -080079 }
80
81 @Query
82 public List<InternalReadResult> read(List<ReadRequest> requests) {
83 List<InternalReadResult> results = new ArrayList<>(requests.size());
84 for (ReadRequest request : requests) {
85 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
86 if (table == null) {
87 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
88 continue;
89 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -080090 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani08822c42014-11-04 17:17:46 -080091 results.add(new InternalReadResult(
92 InternalReadResult.Status.OK,
93 new ReadResult(
94 request.tableName(),
95 request.key(),
96 value)));
97 }
98 return results;
99 }
100
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800101 InternalWriteResult.Status checkIfApplicable(WriteRequest request,
102 VersionedValue value) {
103
104 switch (request.type()) {
105 case PUT:
106 return InternalWriteResult.Status.OK;
107
108 case PUT_IF_ABSENT:
109 if (value == null) {
110 return InternalWriteResult.Status.OK;
111 }
112 return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
113 case PUT_IF_VALUE:
114 case REMOVE_IF_VALUE:
115 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
116 return InternalWriteResult.Status.OK;
117 }
118 return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
119 case PUT_IF_VERSION:
120 case REMOVE_IF_VERSION:
121 if (value != null && request.previousVersion() == value.version()) {
122 return InternalWriteResult.Status.OK;
123 }
124 return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
125 case REMOVE:
126 return InternalWriteResult.Status.OK;
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800127 default:
128 break;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800129 }
130 log.error("Should never reach here {}", request);
131 return InternalWriteResult.Status.ABORTED;
132 }
133
Madan Jampani08822c42014-11-04 17:17:46 -0800134 @Command
135 public List<InternalWriteResult> write(List<WriteRequest> requests) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800136
137 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -0800138 boolean abort = false;
139 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
140 for (WriteRequest request : requests) {
141 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
142 if (table == null) {
143 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
144 abort = true;
145 continue;
146 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800147 final VersionedValue value = table.get(request.key());
148 Status result = checkIfApplicable(request, value);
149 validationResults.add(result);
150 if (result != Status.OK) {
Madan Jampani08822c42014-11-04 17:17:46 -0800151 abort = true;
Madan Jampani08822c42014-11-04 17:17:46 -0800152 }
Madan Jampani08822c42014-11-04 17:17:46 -0800153 }
154
155 List<InternalWriteResult> results = new ArrayList<>(requests.size());
156
157 if (abort) {
158 for (InternalWriteResult.Status validationResult : validationResults) {
159 if (validationResult == InternalWriteResult.Status.OK) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800160 // aborted due to applicability check failure on other request
Madan Jampani08822c42014-11-04 17:17:46 -0800161 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
162 } else {
163 results.add(new InternalWriteResult(validationResult, null));
164 }
165 }
166 return results;
167 }
168
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800169 // apply changes
Madan Jampani08822c42014-11-04 17:17:46 -0800170 for (WriteRequest request : requests) {
171 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800172 // FIXME: If this method could be called by multiple thread,
173 // synchronization scope is wrong.
174 // Whole function including applicability check needs to be protected.
175 // Confirm copycat's thread safety requirement for StateMachine
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800176 // TODO: If we need isolation, we need to block reads also
Madan Jampani08822c42014-11-04 17:17:46 -0800177 synchronized (table) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800178 switch (request.type()) {
179 case PUT:
180 case PUT_IF_ABSENT:
181 case PUT_IF_VALUE:
182 case PUT_IF_VERSION:
183 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
184 VersionedValue previousValue = table.put(request.key(), newValue);
185 WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
186 results.add(InternalWriteResult.ok(putResult));
187 break;
188
189 case REMOVE:
190 case REMOVE_IF_VALUE:
191 case REMOVE_IF_VERSION:
192 VersionedValue removedValue = table.remove(request.key());
193 WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
194 results.add(InternalWriteResult.ok(removeResult));
195 break;
196
197 default:
198 log.error("Invalid WriteRequest type {}", request.type());
199 break;
200 }
Madan Jampani08822c42014-11-04 17:17:46 -0800201 }
202 }
203 return results;
204 }
205
206 public class State {
207
208 private final Map<String, Map<String, VersionedValue>> tables =
209 Maps.newHashMap();
210 private long versionCounter = 1;
211
212 Map<String, Map<String, VersionedValue>> getTables() {
213 return tables;
214 }
215
216 long nextVersion() {
217 return versionCounter++;
218 }
219 }
220
221 @Override
222 public byte[] takeSnapshot() {
223 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800224 if (compressSnapshot) {
225 byte[] input = SERIALIZER.encode(state);
226 ByteArrayOutputStream comp = new ByteArrayOutputStream(input.length);
227 DeflaterOutputStream compressor = new DeflaterOutputStream(comp);
228 compressor.write(input, 0, input.length);
229 compressor.close();
230 return comp.toByteArray();
231 } else {
232 return SERIALIZER.encode(state);
233 }
Madan Jampani08822c42014-11-04 17:17:46 -0800234 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800235 log.error("Failed to take snapshot", e);
236 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800237 }
238 }
239
240 @Override
241 public void installSnapshot(byte[] data) {
242 try {
Yuta HIGUCHIa7680a32014-11-06 22:17:37 -0800243 if (compressSnapshot) {
244 ByteArrayInputStream in = new ByteArrayInputStream(data);
245 InflaterInputStream decompressor = new InflaterInputStream(in);
246 ByteStreams.toByteArray(decompressor);
247 this.state = SERIALIZER.decode(ByteStreams.toByteArray(decompressor));
248 } else {
249 this.state = SERIALIZER.decode(data);
250 }
Madan Jampani08822c42014-11-04 17:17:46 -0800251 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800252 log.error("Failed to install from snapshot", e);
253 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800254 }
255 }
256}