blob: ad6773ebeea08fb3c4903041ca08909feb08cb10 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import java.util.ArrayList;
4import java.util.List;
5import java.util.Map;
6import java.util.Set;
7
8import net.kuujo.copycat.Command;
9import net.kuujo.copycat.Query;
10import net.kuujo.copycat.StateMachine;
11
12import org.onlab.onos.store.serializers.KryoSerializer;
13import org.onlab.onos.store.service.ReadRequest;
14import org.onlab.onos.store.service.ReadResult;
Madan Jampani37c2e702014-11-04 18:11:10 -080015import org.onlab.onos.store.service.VersionedValue;
Madan Jampani08822c42014-11-04 17:17:46 -080016import org.onlab.onos.store.service.WriteRequest;
17import org.onlab.onos.store.service.WriteResult;
18import org.onlab.util.KryoNamespace;
19
20import com.google.common.collect.Maps;
21
Madan Jampani686fa182014-11-04 23:16:27 -080022/**
23 * StateMachine whose transitions are coordinated/replicated
24 * by Raft consensus.
25 * Each Raft cluster member has a instance of this state machine that is
26 * independently updated in lock step once there is consensus
27 * on the next transition.
28 */
Madan Jampani08822c42014-11-04 17:17:46 -080029public class DatabaseStateMachine implements StateMachine {
30
31 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
32 @Override
33 protected void setupKryoPool() {
34 serializerPool = KryoNamespace.newBuilder()
35 .register(VersionedValue.class)
36 .register(State.class)
Madan Jampani9b19a822014-11-04 21:37:13 -080037 .register(ClusterMessagingProtocol.COMMON)
Madan Jampani08822c42014-11-04 17:17:46 -080038 .build()
39 .populate(1);
40 }
41 };
42
43 private State state = new State();
44
45 @Command
46 public boolean createTable(String tableName) {
47 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
48 }
49
50 @Command
51 public boolean dropTable(String tableName) {
52 return state.getTables().remove(tableName) != null;
53 }
54
55 @Command
56 public boolean dropAllTables() {
57 state.getTables().clear();
58 return true;
59 }
60
61 @Query
62 public Set<String> listTables() {
63 return state.getTables().keySet();
64 }
65
66 @Query
67 public List<InternalReadResult> read(List<ReadRequest> requests) {
68 List<InternalReadResult> results = new ArrayList<>(requests.size());
69 for (ReadRequest request : requests) {
70 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
71 if (table == null) {
72 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
73 continue;
74 }
75 VersionedValue value = table.get(request.key());
76 results.add(new InternalReadResult(
77 InternalReadResult.Status.OK,
78 new ReadResult(
79 request.tableName(),
80 request.key(),
81 value)));
82 }
83 return results;
84 }
85
86 @Command
87 public List<InternalWriteResult> write(List<WriteRequest> requests) {
88 boolean abort = false;
89 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
90 for (WriteRequest request : requests) {
91 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
92 if (table == null) {
93 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
94 abort = true;
95 continue;
96 }
97 VersionedValue value = table.get(request.key());
98 if (value == null) {
99 if (request.oldValue() != null) {
100 validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
101 abort = true;
102 continue;
103 } else if (request.previousVersion() >= 0) {
104 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
105 abort = true;
106 continue;
107 }
108 }
109 if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
110 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
111 abort = true;
112 continue;
113 }
114
115 validationResults.add(InternalWriteResult.Status.OK);
116 }
117
118 List<InternalWriteResult> results = new ArrayList<>(requests.size());
119
120 if (abort) {
121 for (InternalWriteResult.Status validationResult : validationResults) {
122 if (validationResult == InternalWriteResult.Status.OK) {
123 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
124 } else {
125 results.add(new InternalWriteResult(validationResult, null));
126 }
127 }
128 return results;
129 }
130
131 for (WriteRequest request : requests) {
132 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
133 synchronized (table) {
134 VersionedValue previousValue =
135 table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
136 results.add(new InternalWriteResult(
137 InternalWriteResult.Status.OK,
138 new WriteResult(request.tableName(), request.key(), previousValue)));
139 }
140 }
141 return results;
142 }
143
144 public class State {
145
146 private final Map<String, Map<String, VersionedValue>> tables =
147 Maps.newHashMap();
148 private long versionCounter = 1;
149
150 Map<String, Map<String, VersionedValue>> getTables() {
151 return tables;
152 }
153
154 long nextVersion() {
155 return versionCounter++;
156 }
157 }
158
159 @Override
160 public byte[] takeSnapshot() {
161 try {
162 return SERIALIZER.encode(state);
163 } catch (Exception e) {
164 e.printStackTrace();
165 return null;
166 }
167 }
168
169 @Override
170 public void installSnapshot(byte[] data) {
171 try {
172 this.state = SERIALIZER.decode(data);
173 } catch (Exception e) {
174 e.printStackTrace();
175 }
176 }
177}