blob: d132b7cfc1b42f89b81b30202864268bcc0516e6 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Madan Jampani08822c42014-11-04 17:17:46 -08005import java.util.ArrayList;
6import java.util.List;
7import java.util.Map;
Madan Jampani08822c42014-11-04 17:17:46 -08008import net.kuujo.copycat.Command;
9import net.kuujo.copycat.Query;
10import net.kuujo.copycat.StateMachine;
11
12import org.onlab.onos.store.serializers.KryoSerializer;
13import org.onlab.onos.store.service.ReadRequest;
14import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080015import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080016import org.onlab.onos.store.service.WriteRequest;
17import org.onlab.onos.store.service.WriteResult;
18import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080019import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080020
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080021import com.google.common.collect.ImmutableList;
Madan Jampani08822c42014-11-04 17:17:46 -080022import com.google.common.collect.Maps;
23
Madan Jampani686fa182014-11-04 23:16:27 -080024/**
25 * StateMachine whose transitions are coordinated/replicated
26 * by Raft consensus.
27 * Each Raft cluster member has a instance of this state machine that is
28 * independently updated in lock step once there is consensus
29 * on the next transition.
30 */
Madan Jampani08822c42014-11-04 17:17:46 -080031public class DatabaseStateMachine implements StateMachine {
32
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080033 private final Logger log = getLogger(getClass());
34
Madan Jampani08822c42014-11-04 17:17:46 -080035 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
36 @Override
37 protected void setupKryoPool() {
38 serializerPool = KryoNamespace.newBuilder()
39 .register(VersionedValue.class)
40 .register(State.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080041 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080042 .build()
43 .populate(1);
44 }
45 };
46
47 private State state = new State();
48
49 @Command
50 public boolean createTable(String tableName) {
51 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
52 }
53
54 @Command
55 public boolean dropTable(String tableName) {
56 return state.getTables().remove(tableName) != null;
57 }
58
59 @Command
60 public boolean dropAllTables() {
61 state.getTables().clear();
62 return true;
63 }
64
65 @Query
Yuta HIGUCHI1838f882014-11-05 18:42:00 -080066 public List<String> listTables() {
67 return ImmutableList.copyOf(state.getTables().keySet());
Madan Jampani08822c42014-11-04 17:17:46 -080068 }
69
70 @Query
71 public List<InternalReadResult> read(List<ReadRequest> requests) {
72 List<InternalReadResult> results = new ArrayList<>(requests.size());
73 for (ReadRequest request : requests) {
74 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
75 if (table == null) {
76 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
77 continue;
78 }
Yuta HIGUCHI3bd8cdc2014-11-05 19:11:44 -080079 VersionedValue value = VersionedValue.copy(table.get(request.key()));
Madan Jampani08822c42014-11-04 17:17:46 -080080 results.add(new InternalReadResult(
81 InternalReadResult.Status.OK,
82 new ReadResult(
83 request.tableName(),
84 request.key(),
85 value)));
86 }
87 return results;
88 }
89
90 @Command
91 public List<InternalWriteResult> write(List<WriteRequest> requests) {
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -080092
93 // applicability check
Madan Jampani08822c42014-11-04 17:17:46 -080094 boolean abort = false;
95 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
96 for (WriteRequest request : requests) {
97 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
98 if (table == null) {
99 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
100 abort = true;
101 continue;
102 }
103 VersionedValue value = table.get(request.key());
104 if (value == null) {
105 if (request.oldValue() != null) {
106 validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
107 abort = true;
108 continue;
109 } else if (request.previousVersion() >= 0) {
110 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
111 abort = true;
112 continue;
113 }
114 }
115 if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
116 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
117 abort = true;
118 continue;
119 }
120
121 validationResults.add(InternalWriteResult.Status.OK);
122 }
123
124 List<InternalWriteResult> results = new ArrayList<>(requests.size());
125
126 if (abort) {
127 for (InternalWriteResult.Status validationResult : validationResults) {
128 if (validationResult == InternalWriteResult.Status.OK) {
129 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
130 } else {
131 results.add(new InternalWriteResult(validationResult, null));
132 }
133 }
134 return results;
135 }
136
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800137 // apply changes
Madan Jampani08822c42014-11-04 17:17:46 -0800138 for (WriteRequest request : requests) {
139 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Yuta HIGUCHIbddc81c42014-11-05 18:53:09 -0800140 // FIXME: If this method could be called by multiple thread,
141 // synchronization scope is wrong.
142 // Whole function including applicability check needs to be protected.
143 // Confirm copycat's thread safety requirement for StateMachine
Madan Jampani08822c42014-11-04 17:17:46 -0800144 synchronized (table) {
145 VersionedValue previousValue =
146 table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
147 results.add(new InternalWriteResult(
148 InternalWriteResult.Status.OK,
149 new WriteResult(request.tableName(), request.key(), previousValue)));
150 }
151 }
152 return results;
153 }
154
155 public class State {
156
157 private final Map<String, Map<String, VersionedValue>> tables =
158 Maps.newHashMap();
159 private long versionCounter = 1;
160
161 Map<String, Map<String, VersionedValue>> getTables() {
162 return tables;
163 }
164
165 long nextVersion() {
166 return versionCounter++;
167 }
168 }
169
170 @Override
171 public byte[] takeSnapshot() {
172 try {
173 return SERIALIZER.encode(state);
174 } catch (Exception e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800175 log.error("Snapshot serialization error", e);
Madan Jampani08822c42014-11-04 17:17:46 -0800176 return null;
177 }
178 }
179
180 @Override
181 public void installSnapshot(byte[] data) {
182 try {
183 this.state = SERIALIZER.decode(data);
184 } catch (Exception e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800185 log.error("Snapshot deserialization error", e);
Madan Jampani08822c42014-11-04 17:17:46 -0800186 }
187 }
188}