blob: 933e0eeaac881670e5a82eba8c351cbdb007b46f [file] [log] [blame]
Madan Jampani778f7ad2014-11-05 22:46:15 -08001package org.onlab.onos.store.service.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4import static com.google.common.base.Preconditions.checkState;
5
6import java.io.File;
7import java.io.IOException;
8import java.util.ArrayList;
9import java.util.Arrays;
10import java.util.List;
11import java.util.concurrent.ConcurrentNavigableMap;
12
13import net.kuujo.copycat.log.Entry;
14import net.kuujo.copycat.log.Log;
15import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
16
17import org.mapdb.Atomic;
18import org.mapdb.BTreeMap;
19import org.mapdb.DB;
20import org.mapdb.DBMaker;
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -080021import org.mapdb.Serializer;
Madan Jampani778f7ad2014-11-05 22:46:15 -080022import org.mapdb.TxBlock;
23import org.mapdb.TxMaker;
24import org.onlab.onos.store.serializers.StoreSerializer;
25
26import com.google.common.collect.Lists;
27
28/**
29 * MapDB based log implementation.
30 */
31public class MapDBLog implements Log {
32
33 private final File dbFile;
34 private TxMaker txMaker;
35 private final StoreSerializer serializer;
36 private static final String LOG_NAME = "log";
37 private static final String SIZE_FIELD_NAME = "size";
38
Madan Jampani2ee20002014-11-06 20:06:12 -080039 public MapDBLog(String dbFileName, StoreSerializer serializer) {
40 this.dbFile = new File(dbFileName);
Madan Jampani778f7ad2014-11-05 22:46:15 -080041 this.serializer = serializer;
42 }
43
44 @Override
45 public void open() throws IOException {
46 txMaker = DBMaker
47 .newFileDB(dbFile)
48 .makeTxMaker();
49 }
50
51 @Override
52 public void close() throws IOException {
53 assertIsOpen();
54 txMaker.close();
55 txMaker = null;
56 }
57
58 @Override
59 public boolean isOpen() {
60 return txMaker != null;
61 }
62
63 protected void assertIsOpen() {
64 checkState(isOpen(), "The log is not currently open.");
65 }
66
67 @Override
68 public long appendEntry(Entry entry) {
69 checkArgument(entry != null, "expecting non-null entry");
70 return appendEntries(entry).get(0);
71 }
72
73 @Override
74 public List<Long> appendEntries(Entry... entries) {
75 checkArgument(entries != null, "expecting non-null entries");
76 return appendEntries(Arrays.asList(entries));
77 }
78
79 @Override
80 public List<Long> appendEntries(List<Entry> entries) {
81 assertIsOpen();
82 checkArgument(entries != null, "expecting non-null entries");
83 final List<Long> indices = Lists.newArrayList();
84
85 txMaker.execute(new TxBlock() {
86 @Override
87 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -080088 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -080089 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
90 long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
91 for (Entry entry : entries) {
92 byte[] entryBytes = serializer.encode(entry);
93 log.put(nextIndex, entryBytes);
94 size.addAndGet(entryBytes.length);
95 indices.add(nextIndex);
96 nextIndex++;
97 }
98 }
99 });
100
101 return indices;
102 }
103
104 @Override
105 public boolean containsEntry(long index) {
106 assertIsOpen();
107 DB db = txMaker.makeTx();
108 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800109 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800110 return log.containsKey(index);
111 } finally {
112 db.close();
113 }
114 }
115
116 @Override
117 public void delete() throws IOException {
118 assertIsOpen();
119 txMaker.execute(new TxBlock() {
120 @Override
121 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800122 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800123 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
124 log.clear();
125 size.set(0);
126 }
127 });
128 }
129
130 @Override
131 public <T extends Entry> T firstEntry() {
132 assertIsOpen();
133 DB db = txMaker.makeTx();
134 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800135 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800136 return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
137 } finally {
138 db.close();
139 }
140 }
141
142 @Override
143 public long firstIndex() {
144 assertIsOpen();
145 DB db = txMaker.makeTx();
146 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800147 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800148 return log.isEmpty() ? 0 : log.firstKey();
149 } finally {
150 db.close();
151 }
152 }
153
154 @Override
155 public <T extends Entry> List<T> getEntries(long from, long to) {
156 assertIsOpen();
157 DB db = txMaker.makeTx();
158 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800159 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800160 if (log.isEmpty()) {
161 throw new LogIndexOutOfBoundsException("Log is empty");
162 } else if (from < log.firstKey()) {
163 throw new LogIndexOutOfBoundsException("From index out of bounds.");
164 } else if (to > log.lastKey()) {
165 throw new LogIndexOutOfBoundsException("To index out of bounds.");
166 }
167 List<T> entries = new ArrayList<>((int) (to - from + 1));
168 for (long i = from; i <= to; i++) {
169 T entry = serializer.decode(log.get(i));
170 entries.add(entry);
171 }
172 return entries;
173 } finally {
174 db.close();
175 }
176 }
177
178 @Override
179 public <T extends Entry> T getEntry(long index) {
180 assertIsOpen();
181 DB db = txMaker.makeTx();
182 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800183 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800184 byte[] entryBytes = log.get(index);
185 return entryBytes == null ? null : serializer.decode(entryBytes);
186 } finally {
187 db.close();
188 }
189 }
190
191 @Override
192 public boolean isEmpty() {
193 assertIsOpen();
194 DB db = txMaker.makeTx();
195 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800196 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800197 return log.isEmpty();
198 } finally {
199 db.close();
200 }
201 }
202
203 @Override
204 public <T extends Entry> T lastEntry() {
205 assertIsOpen();
206 DB db = txMaker.makeTx();
207 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800208 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800209 return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
210 } finally {
211 db.close();
212 }
213 }
214
215 @Override
216 public long lastIndex() {
217 assertIsOpen();
218 DB db = txMaker.makeTx();
219 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800220 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800221 return log.isEmpty() ? 0 : log.lastKey();
222 } finally {
223 db.close();
224 }
225 }
226
227 @Override
228 public void removeAfter(long index) {
229 assertIsOpen();
230 txMaker.execute(new TxBlock() {
231 @Override
232 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800233 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800234 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
235 long startIndex = index + 1;
236 long endIndex = log.lastKey();
237 for (long i = startIndex; i <= endIndex; ++i) {
238 byte[] entryBytes = log.remove(i);
239 size.addAndGet(-1L * entryBytes.length);
240 }
241 }
242 });
243 }
244
245 @Override
246 public long size() {
247 assertIsOpen();
248 DB db = txMaker.makeTx();
249 try {
250 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
251 return size.get();
252 } finally {
253 db.close();
254 }
255 }
256
257 @Override
258 public void sync() throws IOException {
259 assertIsOpen();
260 }
261
262 @Override
263 public void compact(long index, Entry entry) throws IOException {
264
265 assertIsOpen();
266 txMaker.execute(new TxBlock() {
267 @Override
268 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800269 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800270 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
271 ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
272 long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
273 size.addAndGet(-1 * deletedBytes);
274 byte[] entryBytes = serializer.encode(entry);
275 byte[] existingEntry = log.put(index, entryBytes);
276 size.addAndGet(entryBytes.length - existingEntry.length);
277 db.compact();
278 }
279 });
280 }
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800281
282 private BTreeMap<Long, byte[]> getLogMap(DB db) {
283 return db.createTreeMap(LOG_NAME)
284 .valuesOutsideNodesEnable()
285 .keySerializerWrap(Serializer.LONG)
286 .valueSerializer(Serializer.BYTE_ARRAY)
287 .makeOrGet();
288 }
Madan Jampani2ee20002014-11-06 20:06:12 -0800289}