blob: 9c323f0f26bce6f3c1076ff84edabd9f0331dc3d [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Madan Jampani08822c42014-11-04 17:17:46 -08005import java.util.ArrayList;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08006import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.List;
8import java.util.Map;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08009
Madan Jampani08822c42014-11-04 17:17:46 -080010import net.kuujo.copycat.Command;
11import net.kuujo.copycat.Query;
12import net.kuujo.copycat.StateMachine;
13
14import org.onlab.onos.store.serializers.KryoSerializer;
15import org.onlab.onos.store.service.ReadRequest;
16import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080017import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080018import org.onlab.onos.store.service.WriteRequest;
19import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080020import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080022import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080023
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080024import com.google.common.collect.ImmutableList;
Madan Jampani08822c42014-11-04 17:17:46 -080025import com.google.common.collect.Maps;
26
Madan Jampani686fa182014-11-04 23:16:27 -080027/**
28 * StateMachine whose transitions are coordinated/replicated
29 * by Raft consensus.
30 * Each Raft cluster member has a instance of this state machine that is
31 * independently updated in lock step once there is consensus
32 * on the next transition.
33 */
Madan Jampani08822c42014-11-04 17:17:46 -080034public class DatabaseStateMachine implements StateMachine {
35
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080036 private final Logger log = getLogger(getClass());
37
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080038 // serializer used for snapshot
Madan Jampani08822c42014-11-04 17:17:46 -080039 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
40 @Override
41 protected void setupKryoPool() {
42 serializerPool = KryoNamespace.newBuilder()
43 .register(VersionedValue.class)
44 .register(State.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080045 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080046 .build()
47 .populate(1);
48 }
49 };
50
51 private State state = new State();
52
53 @Command
54 public boolean createTable(String tableName) {
55 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
56 }
57
58 @Command
59 public boolean dropTable(String tableName) {
60 return state.getTables().remove(tableName) != null;
61 }
62
63 @Command
64 public boolean dropAllTables() {
65 state.getTables().clear();
66 return true;
67 }
68
69 @Query
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080070 public List<String> listTables() {
71 return ImmutableList.copyOf(state.getTables().keySet());
Madan Jampani08822c42014-11-04 17:17:46 -080072 }
73
74 @Query
75 public List<InternalReadResult> read(List<ReadRequest> requests) {
76 List<InternalReadResult> results = new ArrayList<>(requests.size());
77 for (ReadRequest request : requests) {
78 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
79 if (table == null) {
80 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
81 continue;
82 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -080083 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani08822c42014-11-04 17:17:46 -080084 results.add(new InternalReadResult(
85 InternalReadResult.Status.OK,
86 new ReadResult(
87 request.tableName(),
88 request.key(),
89 value)));
90 }
91 return results;
92 }
93
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080094 InternalWriteResult.Status checkIfApplicable(WriteRequest request,
95 VersionedValue value) {
96
97 switch (request.type()) {
98 case PUT:
99 return InternalWriteResult.Status.OK;
100
101 case PUT_IF_ABSENT:
102 if (value == null) {
103 return InternalWriteResult.Status.OK;
104 }
105 return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
106 case PUT_IF_VALUE:
107 case REMOVE_IF_VALUE:
108 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
109 return InternalWriteResult.Status.OK;
110 }
111 return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
112 case PUT_IF_VERSION:
113 case REMOVE_IF_VERSION:
114 if (value != null && request.previousVersion() == value.version()) {
115 return InternalWriteResult.Status.OK;
116 }
117 return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
118 case REMOVE:
119 return InternalWriteResult.Status.OK;
120 }
121 log.error("Should never reach here {}", request);
122 return InternalWriteResult.Status.ABORTED;
123 }
124
Madan Jampani08822c42014-11-04 17:17:46 -0800125 @Command
126 public List<InternalWriteResult> write(List<WriteRequest> requests) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800127
128 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -0800129 boolean abort = false;
130 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
131 for (WriteRequest request : requests) {
132 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
133 if (table == null) {
134 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
135 abort = true;
136 continue;
137 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800138 final VersionedValue value = table.get(request.key());
139 Status result = checkIfApplicable(request, value);
140 validationResults.add(result);
141 if (result != Status.OK) {
Madan Jampani08822c42014-11-04 17:17:46 -0800142 abort = true;
Madan Jampani08822c42014-11-04 17:17:46 -0800143 }
Madan Jampani08822c42014-11-04 17:17:46 -0800144 }
145
146 List<InternalWriteResult> results = new ArrayList<>(requests.size());
147
148 if (abort) {
149 for (InternalWriteResult.Status validationResult : validationResults) {
150 if (validationResult == InternalWriteResult.Status.OK) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800151 // aborted due to applicability check failure on other request
Madan Jampani08822c42014-11-04 17:17:46 -0800152 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
153 } else {
154 results.add(new InternalWriteResult(validationResult, null));
155 }
156 }
157 return results;
158 }
159
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800160 // apply changes
Madan Jampani08822c42014-11-04 17:17:46 -0800161 for (WriteRequest request : requests) {
162 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800163 // FIXME: If this method could be called by multiple thread,
164 // synchronization scope is wrong.
165 // Whole function including applicability check needs to be protected.
166 // Confirm copycat's thread safety requirement for StateMachine
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800167 // TODO: If we need isolation, we need to block reads also
Madan Jampani08822c42014-11-04 17:17:46 -0800168 synchronized (table) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800169 switch (request.type()) {
170 case PUT:
171 case PUT_IF_ABSENT:
172 case PUT_IF_VALUE:
173 case PUT_IF_VERSION:
174 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
175 VersionedValue previousValue = table.put(request.key(), newValue);
176 WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
177 results.add(InternalWriteResult.ok(putResult));
178 break;
179
180 case REMOVE:
181 case REMOVE_IF_VALUE:
182 case REMOVE_IF_VERSION:
183 VersionedValue removedValue = table.remove(request.key());
184 WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
185 results.add(InternalWriteResult.ok(removeResult));
186 break;
187
188 default:
189 log.error("Invalid WriteRequest type {}", request.type());
190 break;
191 }
Madan Jampani08822c42014-11-04 17:17:46 -0800192 }
193 }
194 return results;
195 }
196
197 public class State {
198
199 private final Map<String, Map<String, VersionedValue>> tables =
200 Maps.newHashMap();
201 private long versionCounter = 1;
202
203 Map<String, Map<String, VersionedValue>> getTables() {
204 return tables;
205 }
206
207 long nextVersion() {
208 return versionCounter++;
209 }
210 }
211
212 @Override
213 public byte[] takeSnapshot() {
214 try {
215 return SERIALIZER.encode(state);
216 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800217 log.error("Failed to take snapshot", e);
218 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800219 }
220 }
221
222 @Override
223 public void installSnapshot(byte[] data) {
224 try {
225 this.state = SERIALIZER.decode(data);
226 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800227 log.error("Failed to install from snapshot", e);
228 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800229 }
230 }
231}