blob: cbca729ea93314761762123e754edd892edc0f15 [file] [log] [blame]
Madan Jampani08822c42014-11-04 17:17:46 -08001package org.onlab.onos.store.service.impl;
2
3import java.util.ArrayList;
4import java.util.List;
5import java.util.Map;
6import java.util.Set;
7
8import net.kuujo.copycat.Command;
9import net.kuujo.copycat.Query;
10import net.kuujo.copycat.StateMachine;
11
12import org.onlab.onos.store.serializers.KryoSerializer;
13import org.onlab.onos.store.service.ReadRequest;
14import org.onlab.onos.store.service.ReadResult;
15import org.onlab.onos.store.service.WriteRequest;
16import org.onlab.onos.store.service.WriteResult;
17import org.onlab.util.KryoNamespace;
18
19import com.google.common.collect.Maps;
20
21public class DatabaseStateMachine implements StateMachine {
22
23 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
24 @Override
25 protected void setupKryoPool() {
26 serializerPool = KryoNamespace.newBuilder()
27 .register(VersionedValue.class)
28 .register(State.class)
29 .register(NettyProtocol.COMMON)
30 .build()
31 .populate(1);
32 }
33 };
34
35 private State state = new State();
36
37 @Command
38 public boolean createTable(String tableName) {
39 return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
40 }
41
42 @Command
43 public boolean dropTable(String tableName) {
44 return state.getTables().remove(tableName) != null;
45 }
46
47 @Command
48 public boolean dropAllTables() {
49 state.getTables().clear();
50 return true;
51 }
52
53 @Query
54 public Set<String> listTables() {
55 return state.getTables().keySet();
56 }
57
58 @Query
59 public List<InternalReadResult> read(List<ReadRequest> requests) {
60 List<InternalReadResult> results = new ArrayList<>(requests.size());
61 for (ReadRequest request : requests) {
62 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
63 if (table == null) {
64 results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
65 continue;
66 }
67 VersionedValue value = table.get(request.key());
68 results.add(new InternalReadResult(
69 InternalReadResult.Status.OK,
70 new ReadResult(
71 request.tableName(),
72 request.key(),
73 value)));
74 }
75 return results;
76 }
77
78 @Command
79 public List<InternalWriteResult> write(List<WriteRequest> requests) {
80 boolean abort = false;
81 List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
82 for (WriteRequest request : requests) {
83 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
84 if (table == null) {
85 validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
86 abort = true;
87 continue;
88 }
89 VersionedValue value = table.get(request.key());
90 if (value == null) {
91 if (request.oldValue() != null) {
92 validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
93 abort = true;
94 continue;
95 } else if (request.previousVersion() >= 0) {
96 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
97 abort = true;
98 continue;
99 }
100 }
101 if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
102 validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
103 abort = true;
104 continue;
105 }
106
107 validationResults.add(InternalWriteResult.Status.OK);
108 }
109
110 List<InternalWriteResult> results = new ArrayList<>(requests.size());
111
112 if (abort) {
113 for (InternalWriteResult.Status validationResult : validationResults) {
114 if (validationResult == InternalWriteResult.Status.OK) {
115 results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
116 } else {
117 results.add(new InternalWriteResult(validationResult, null));
118 }
119 }
120 return results;
121 }
122
123 for (WriteRequest request : requests) {
124 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
125 synchronized (table) {
126 VersionedValue previousValue =
127 table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
128 results.add(new InternalWriteResult(
129 InternalWriteResult.Status.OK,
130 new WriteResult(request.tableName(), request.key(), previousValue)));
131 }
132 }
133 return results;
134 }
135
136 public class State {
137
138 private final Map<String, Map<String, VersionedValue>> tables =
139 Maps.newHashMap();
140 private long versionCounter = 1;
141
142 Map<String, Map<String, VersionedValue>> getTables() {
143 return tables;
144 }
145
146 long nextVersion() {
147 return versionCounter++;
148 }
149 }
150
151 @Override
152 public byte[] takeSnapshot() {
153 try {
154 return SERIALIZER.encode(state);
155 } catch (Exception e) {
156 e.printStackTrace();
157 return null;
158 }
159 }
160
161 @Override
162 public void installSnapshot(byte[] data) {
163 try {
164 this.state = SERIALIZER.decode(data);
165 } catch (Exception e) {
166 e.printStackTrace();
167 }
168 }
169}