blob: 9ef9cb6297e22da323d179b82de3d7574891acf1 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Madan Jampani08822c42014-11-04 17:17:46 -08005import java.util.ArrayList;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08006import java.util.Arrays;
Madan Jampani08822c42014-11-04 17:17:46 -08007import java.util.List;
8import java.util.Map;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -08009
Madan Jampani08822c42014-11-04 17:17:46 -080010import net.kuujo.copycat.Command;
11import net.kuujo.copycat.Query;
12import net.kuujo.copycat.StateMachine;
13
14import org.onlab.onos.store.serializers.KryoSerializer;
15import org.onlab.onos.store.service.ReadRequest;
16import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080017import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080018import org.onlab.onos.store.service.WriteRequest;
19import org.onlab.onos.store.service.WriteResult;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080020import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
Madan Jampani08822c42014-11-04 17:17:46 -080021import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080022import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080023
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080024import com.google.common.collect.ImmutableList;
Madan Jampani08822c42014-11-04 17:17:46 -080025import com.google.common.collect.Maps;
26
Madan Jampani686fa182014-11-04 23:16:27 -080027/**
28 * StateMachine whose transitions are coordinated/replicated
29 * by Raft consensus.
30 * Each Raft cluster member has a instance of this state machine that is
31 * independently updated in lock step once there is consensus
32 * on the next transition.
33 */
Madan Jampani08822c42014-11-04 17:17:46 -080034public class DatabaseStateMachine implements StateMachine {
35
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080036 private final Logger log = getLogger(getClass());
37
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080038 // serializer used for snapshot
Madan Jampani08822c42014-11-04 17:17:46 -080039 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
40 @Override
41 protected void setupKryoPool() {
42 serializerPool = KryoNamespace.newBuilder()
43 .register(VersionedValue.class)
44 .register(State.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080045 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080046 .build()
47 .populate(1);
48 }
49 };
50
51 private State state = new State();
52
53 @Command
54 public boolean createTable(String tableName) {
55 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
56 }
57
58 @Command
59 public boolean dropTable(String tableName) {
60 return state.getTables().remove(tableName) != null;
61 }
62
63 @Command
64 public boolean dropAllTables() {
65 state.getTables().clear();
66 return true;
67 }
68
69 @Query
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080070 public List<String> listTables() {
71 return ImmutableList.copyOf(state.getTables().keySet());
Madan Jampani08822c42014-11-04 17:17:46 -080072 }
73
74 @Query
75 public List<InternalReadResult> read(List<ReadRequest> requests) {
76 List<InternalReadResult> results = new ArrayList<>(requests.size());
77 for (ReadRequest request : requests) {
78 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
79 if (table == null) {
80 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
81 continue;
82 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -080083 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani08822c42014-11-04 17:17:46 -080084 results.add(new InternalReadResult(
85 InternalReadResult.Status.OK,
86 new ReadResult(
87 request.tableName(),
88 request.key(),
89 value)));
90 }
91 return results;
92 }
93
Yuta HIGUCHI361664e2014-11-06 17:28:47 -080094 InternalWriteResult.Status checkIfApplicable(WriteRequest request,
95 VersionedValue value) {
96
97 switch (request.type()) {
98 case PUT:
99 return InternalWriteResult.Status.OK;
100
101 case PUT_IF_ABSENT:
102 if (value == null) {
103 return InternalWriteResult.Status.OK;
104 }
105 return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
106 case PUT_IF_VALUE:
107 case REMOVE_IF_VALUE:
108 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
109 return InternalWriteResult.Status.OK;
110 }
111 return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
112 case PUT_IF_VERSION:
113 case REMOVE_IF_VERSION:
114 if (value != null && request.previousVersion() == value.version()) {
115 return InternalWriteResult.Status.OK;
116 }
117 return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
118 case REMOVE:
119 return InternalWriteResult.Status.OK;
Yuta HIGUCHIc6b8f612014-11-06 19:04:13 -0800120 default:
121 break;
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800122 }
123 log.error("Should never reach here {}", request);
124 return InternalWriteResult.Status.ABORTED;
125 }
126
Madan Jampani08822c42014-11-04 17:17:46 -0800127 @Command
128 public List<InternalWriteResult> write(List<WriteRequest> requests) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800129
130 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -0800131 boolean abort = false;
132 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
133 for (WriteRequest request : requests) {
134 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
135 if (table == null) {
136 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
137 abort = true;
138 continue;
139 }
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800140 final VersionedValue value = table.get(request.key());
141 Status result = checkIfApplicable(request, value);
142 validationResults.add(result);
143 if (result != Status.OK) {
Madan Jampani08822c42014-11-04 17:17:46 -0800144 abort = true;
Madan Jampani08822c42014-11-04 17:17:46 -0800145 }
Madan Jampani08822c42014-11-04 17:17:46 -0800146 }
147
148 List<InternalWriteResult> results = new ArrayList<>(requests.size());
149
150 if (abort) {
151 for (InternalWriteResult.Status validationResult : validationResults) {
152 if (validationResult == InternalWriteResult.Status.OK) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800153 // aborted due to applicability check failure on other request
Madan Jampani08822c42014-11-04 17:17:46 -0800154 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
155 } else {
156 results.add(new InternalWriteResult(validationResult, null));
157 }
158 }
159 return results;
160 }
161
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800162 // apply changes
Madan Jampani08822c42014-11-04 17:17:46 -0800163 for (WriteRequest request : requests) {
164 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800165 // FIXME: If this method could be called by multiple thread,
166 // synchronization scope is wrong.
167 // Whole function including applicability check needs to be protected.
168 // Confirm copycat's thread safety requirement for StateMachine
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800169 // TODO: If we need isolation, we need to block reads also
Madan Jampani08822c42014-11-04 17:17:46 -0800170 synchronized (table) {
Yuta HIGUCHI361664e2014-11-06 17:28:47 -0800171 switch (request.type()) {
172 case PUT:
173 case PUT_IF_ABSENT:
174 case PUT_IF_VALUE:
175 case PUT_IF_VERSION:
176 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
177 VersionedValue previousValue = table.put(request.key(), newValue);
178 WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
179 results.add(InternalWriteResult.ok(putResult));
180 break;
181
182 case REMOVE:
183 case REMOVE_IF_VALUE:
184 case REMOVE_IF_VERSION:
185 VersionedValue removedValue = table.remove(request.key());
186 WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
187 results.add(InternalWriteResult.ok(removeResult));
188 break;
189
190 default:
191 log.error("Invalid WriteRequest type {}", request.type());
192 break;
193 }
Madan Jampani08822c42014-11-04 17:17:46 -0800194 }
195 }
196 return results;
197 }
198
199 public class State {
200
201 private final Map<String, Map<String, VersionedValue>> tables =
202 Maps.newHashMap();
203 private long versionCounter = 1;
204
205 Map<String, Map<String, VersionedValue>> getTables() {
206 return tables;
207 }
208
209 long nextVersion() {
210 return versionCounter++;
211 }
212 }
213
214 @Override
215 public byte[] takeSnapshot() {
216 try {
217 return SERIALIZER.encode(state);
218 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800219 log.error("Failed to take snapshot", e);
220 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800221 }
222 }
223
224 @Override
225 public void installSnapshot(byte[] data) {
226 try {
227 this.state = SERIALIZER.decode(data);
228 } catch (Exception e) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800229 log.error("Failed to install from snapshot", e);
230 throw new SnapshotException(e);
Madan Jampani08822c42014-11-04 17:17:46 -0800231 }
232 }
233}