blob: 2822f25251ba525eed398dc15bbea4b8aaf43aee [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Madan Jampani08822c42014-11-04 17:17:46 -08005import java.util.ArrayList;
6import java.util.List;
7import java.util.Map;
8import java.util.Set;
9
10import net.kuujo.copycat.Command;
11import net.kuujo.copycat.Query;
12import net.kuujo.copycat.StateMachine;
13
14import org.onlab.onos.store.serializers.KryoSerializer;
15import org.onlab.onos.store.service.ReadRequest;
16import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080017import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080018import org.onlab.onos.store.service.WriteRequest;
19import org.onlab.onos.store.service.WriteResult;
20import org.onlab.util.KryoNamespace;
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080021import org.slf4j.Logger;
Madan Jampani08822c42014-11-04 17:17:46 -080022
23import com.google.common.collect.Maps;
24
Madan Jampani686fa182014-11-04 23:16:27 -080025/**
26 * StateMachine whose transitions are coordinated/replicated
27 * by Raft consensus.
28 * Each Raft cluster member has a instance of this state machine that is
29 * independently updated in lock step once there is consensus
30 * on the next transition.
31 */
Madan Jampani08822c42014-11-04 17:17:46 -080032public class DatabaseStateMachine implements StateMachine {
33
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -080034 private final Logger log = getLogger(getClass());
35
Madan Jampani08822c42014-11-04 17:17:46 -080036 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
37 @Override
38 protected void setupKryoPool() {
39 serializerPool = KryoNamespace.newBuilder()
40 .register(VersionedValue.class)
41 .register(State.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080042 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080043 .build()
44 .populate(1);
45 }
46 };
47
48 private State state = new State();
49
50 @Command
51 public boolean createTable(String tableName) {
52 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
53 }
54
55 @Command
56 public boolean dropTable(String tableName) {
57 return state.getTables().remove(tableName) != null;
58 }
59
60 @Command
61 public boolean dropAllTables() {
62 state.getTables().clear();
63 return true;
64 }
65
66 @Query
67 public Set<String> listTables() {
68 return state.getTables().keySet();
69 }
70
71 @Query
72 public List<InternalReadResult> read(List<ReadRequest> requests) {
73 List<InternalReadResult> results = new ArrayList<>(requests.size());
74 for (ReadRequest request : requests) {
75 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
76 if (table == null) {
77 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
78 continue;
79 }
80 VersionedValue value = table.get(request.key());
81 results.add(new InternalReadResult(
82 InternalReadResult.Status.OK,
83 new ReadResult(
84 request.tableName(),
85 request.key(),
86 value)));
87 }
88 return results;
89 }
90
91 @Command
92 public List<InternalWriteResult> write(List<WriteRequest> requests) {
93 boolean abort = false;
94 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
95 for (WriteRequest request : requests) {
96 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
97 if (table == null) {
98 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
99 abort = true;
100 continue;
101 }
102 VersionedValue value = table.get(request.key());
103 if (value == null) {
104 if (request.oldValue() != null) {
105 validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
106 abort = true;
107 continue;
108 } else if (request.previousVersion() >= 0) {
109 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
110 abort = true;
111 continue;
112 }
113 }
114 if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
115 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
116 abort = true;
117 continue;
118 }
119
120 validationResults.add(InternalWriteResult.Status.OK);
121 }
122
123 List<InternalWriteResult> results = new ArrayList<>(requests.size());
124
125 if (abort) {
126 for (InternalWriteResult.Status validationResult : validationResults) {
127 if (validationResult == InternalWriteResult.Status.OK) {
128 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
129 } else {
130 results.add(new InternalWriteResult(validationResult, null));
131 }
132 }
133 return results;
134 }
135
136 for (WriteRequest request : requests) {
137 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
138 synchronized (table) {
139 VersionedValue previousValue =
140 table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
141 results.add(new InternalWriteResult(
142 InternalWriteResult.Status.OK,
143 new WriteResult(request.tableName(), request.key(), previousValue)));
144 }
145 }
146 return results;
147 }
148
149 public class State {
150
151 private final Map<String, Map<String, VersionedValue>> tables =
152 Maps.newHashMap();
153 private long versionCounter = 1;
154
155 Map<String, Map<String, VersionedValue>> getTables() {
156 return tables;
157 }
158
159 long nextVersion() {
160 return versionCounter++;
161 }
162 }
163
164 @Override
165 public byte[] takeSnapshot() {
166 try {
167 return SERIALIZER.encode(state);
168 } catch (Exception e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800169 log.error("Snapshot serialization error", e);
Madan Jampani08822c42014-11-04 17:17:46 -0800170 return null;
171 }
172 }
173
174 @Override
175 public void installSnapshot(byte[] data) {
176 try {
177 this.state = SERIALIZER.decode(data);
178 } catch (Exception e) {
Yuta HIGUCHI39ae5502014-11-05 16:42:12 -0800179 log.error("Snapshot deserialization error", e);
Madan Jampani08822c42014-11-04 17:17:46 -0800180 }
181 }
182}