blob: 893c31179ac2c27c640956547175280c86ecc488 [file] [log] [blame]
Madan Jampani778f7ad2014-11-05 22:46:15 -08001package org.onlab.onos.store.service.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4import static com.google.common.base.Preconditions.checkState;
5
6import java.io.File;
7import java.io.IOException;
8import java.util.ArrayList;
9import java.util.Arrays;
10import java.util.List;
11import java.util.concurrent.ConcurrentNavigableMap;
12
13import net.kuujo.copycat.log.Entry;
14import net.kuujo.copycat.log.Log;
15import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
16
17import org.mapdb.Atomic;
18import org.mapdb.BTreeMap;
19import org.mapdb.DB;
20import org.mapdb.DBMaker;
21import org.mapdb.TxBlock;
22import org.mapdb.TxMaker;
23import org.onlab.onos.store.serializers.StoreSerializer;
24
25import com.google.common.collect.Lists;
26
27/**
28 * MapDB based log implementation.
29 */
30public class MapDBLog implements Log {
31
32 private final File dbFile;
33 private TxMaker txMaker;
34 private final StoreSerializer serializer;
35 private static final String LOG_NAME = "log";
36 private static final String SIZE_FIELD_NAME = "size";
37
38 public MapDBLog(File dbFile, StoreSerializer serializer) {
39 this.dbFile = dbFile;
40 this.serializer = serializer;
41 }
42
43 @Override
44 public void open() throws IOException {
45 txMaker = DBMaker
46 .newFileDB(dbFile)
47 .makeTxMaker();
48 }
49
50 @Override
51 public void close() throws IOException {
52 assertIsOpen();
53 txMaker.close();
54 txMaker = null;
55 }
56
57 @Override
58 public boolean isOpen() {
59 return txMaker != null;
60 }
61
62 protected void assertIsOpen() {
63 checkState(isOpen(), "The log is not currently open.");
64 }
65
66 @Override
67 public long appendEntry(Entry entry) {
68 checkArgument(entry != null, "expecting non-null entry");
69 return appendEntries(entry).get(0);
70 }
71
72 @Override
73 public List<Long> appendEntries(Entry... entries) {
74 checkArgument(entries != null, "expecting non-null entries");
75 return appendEntries(Arrays.asList(entries));
76 }
77
78 @Override
79 public List<Long> appendEntries(List<Entry> entries) {
80 assertIsOpen();
81 checkArgument(entries != null, "expecting non-null entries");
82 final List<Long> indices = Lists.newArrayList();
83
84 txMaker.execute(new TxBlock() {
85 @Override
86 public void tx(DB db) {
87 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
88 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
89 long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
90 for (Entry entry : entries) {
91 byte[] entryBytes = serializer.encode(entry);
92 log.put(nextIndex, entryBytes);
93 size.addAndGet(entryBytes.length);
94 indices.add(nextIndex);
95 nextIndex++;
96 }
97 }
98 });
99
100 return indices;
101 }
102
103 @Override
104 public boolean containsEntry(long index) {
105 assertIsOpen();
106 DB db = txMaker.makeTx();
107 try {
108 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
109 return log.containsKey(index);
110 } finally {
111 db.close();
112 }
113 }
114
115 @Override
116 public void delete() throws IOException {
117 assertIsOpen();
118 txMaker.execute(new TxBlock() {
119 @Override
120 public void tx(DB db) {
121 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
122 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
123 log.clear();
124 size.set(0);
125 }
126 });
127 }
128
129 @Override
130 public <T extends Entry> T firstEntry() {
131 assertIsOpen();
132 DB db = txMaker.makeTx();
133 try {
134 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
135 return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
136 } finally {
137 db.close();
138 }
139 }
140
141 @Override
142 public long firstIndex() {
143 assertIsOpen();
144 DB db = txMaker.makeTx();
145 try {
146 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
147 return log.isEmpty() ? 0 : log.firstKey();
148 } finally {
149 db.close();
150 }
151 }
152
153 @Override
154 public <T extends Entry> List<T> getEntries(long from, long to) {
155 assertIsOpen();
156 DB db = txMaker.makeTx();
157 try {
158 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
159 if (log.isEmpty()) {
160 throw new LogIndexOutOfBoundsException("Log is empty");
161 } else if (from < log.firstKey()) {
162 throw new LogIndexOutOfBoundsException("From index out of bounds.");
163 } else if (to > log.lastKey()) {
164 throw new LogIndexOutOfBoundsException("To index out of bounds.");
165 }
166 List<T> entries = new ArrayList<>((int) (to - from + 1));
167 for (long i = from; i <= to; i++) {
168 T entry = serializer.decode(log.get(i));
169 entries.add(entry);
170 }
171 return entries;
172 } finally {
173 db.close();
174 }
175 }
176
177 @Override
178 public <T extends Entry> T getEntry(long index) {
179 assertIsOpen();
180 DB db = txMaker.makeTx();
181 try {
182 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
183 byte[] entryBytes = log.get(index);
184 return entryBytes == null ? null : serializer.decode(entryBytes);
185 } finally {
186 db.close();
187 }
188 }
189
190 @Override
191 public boolean isEmpty() {
192 assertIsOpen();
193 DB db = txMaker.makeTx();
194 try {
195 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
196 return log.isEmpty();
197 } finally {
198 db.close();
199 }
200 }
201
202 @Override
203 public <T extends Entry> T lastEntry() {
204 assertIsOpen();
205 DB db = txMaker.makeTx();
206 try {
207 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
208 return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
209 } finally {
210 db.close();
211 }
212 }
213
214 @Override
215 public long lastIndex() {
216 assertIsOpen();
217 DB db = txMaker.makeTx();
218 try {
219 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
220 return log.isEmpty() ? 0 : log.lastKey();
221 } finally {
222 db.close();
223 }
224 }
225
226 @Override
227 public void removeAfter(long index) {
228 assertIsOpen();
229 txMaker.execute(new TxBlock() {
230 @Override
231 public void tx(DB db) {
232 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
233 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
234 long startIndex = index + 1;
235 long endIndex = log.lastKey();
236 for (long i = startIndex; i <= endIndex; ++i) {
237 byte[] entryBytes = log.remove(i);
238 size.addAndGet(-1L * entryBytes.length);
239 }
240 }
241 });
242 }
243
244 @Override
245 public long size() {
246 assertIsOpen();
247 DB db = txMaker.makeTx();
248 try {
249 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
250 return size.get();
251 } finally {
252 db.close();
253 }
254 }
255
256 @Override
257 public void sync() throws IOException {
258 assertIsOpen();
259 }
260
261 @Override
262 public void compact(long index, Entry entry) throws IOException {
263
264 assertIsOpen();
265 txMaker.execute(new TxBlock() {
266 @Override
267 public void tx(DB db) {
268 BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
269 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
270 ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
271 long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
272 size.addAndGet(-1 * deletedBytes);
273 byte[] entryBytes = serializer.encode(entry);
274 byte[] existingEntry = log.put(index, entryBytes);
275 size.addAndGet(entryBytes.length - existingEntry.length);
276 db.compact();
277 }
278 });
279 }
280}