blob: 663e0f09d0410b88f2f8b4c2eacbb7f74e1da604 [file] [log] [blame]
Madan Jampani778f7ad2014-11-05 22:46:15 -08001package org.onlab.onos.store.service.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4import static com.google.common.base.Preconditions.checkState;
Madan Jampani348a9fe2014-11-09 01:37:51 -08005import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani778f7ad2014-11-05 22:46:15 -08006
7import java.io.File;
8import java.io.IOException;
9import java.util.ArrayList;
10import java.util.Arrays;
11import java.util.List;
12import java.util.concurrent.ConcurrentNavigableMap;
13
14import net.kuujo.copycat.log.Entry;
15import net.kuujo.copycat.log.Log;
16import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
17
18import org.mapdb.Atomic;
19import org.mapdb.BTreeMap;
20import org.mapdb.DB;
21import org.mapdb.DBMaker;
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -080022import org.mapdb.Serializer;
Madan Jampani778f7ad2014-11-05 22:46:15 -080023import org.mapdb.TxBlock;
24import org.mapdb.TxMaker;
25import org.onlab.onos.store.serializers.StoreSerializer;
Madan Jampani348a9fe2014-11-09 01:37:51 -080026import org.slf4j.Logger;
Madan Jampani778f7ad2014-11-05 22:46:15 -080027
28import com.google.common.collect.Lists;
29
30/**
31 * MapDB based log implementation.
32 */
33public class MapDBLog implements Log {
34
Madan Jampani348a9fe2014-11-09 01:37:51 -080035 private final Logger log = getLogger(getClass());
36
Madan Jampani778f7ad2014-11-05 22:46:15 -080037 private final File dbFile;
38 private TxMaker txMaker;
39 private final StoreSerializer serializer;
40 private static final String LOG_NAME = "log";
41 private static final String SIZE_FIELD_NAME = "size";
42
Madan Jampani2ee20002014-11-06 20:06:12 -080043 public MapDBLog(String dbFileName, StoreSerializer serializer) {
44 this.dbFile = new File(dbFileName);
Madan Jampani778f7ad2014-11-05 22:46:15 -080045 this.serializer = serializer;
46 }
47
48 @Override
49 public void open() throws IOException {
50 txMaker = DBMaker
51 .newFileDB(dbFile)
52 .makeTxMaker();
53 }
54
55 @Override
56 public void close() throws IOException {
57 assertIsOpen();
58 txMaker.close();
59 txMaker = null;
60 }
61
62 @Override
63 public boolean isOpen() {
64 return txMaker != null;
65 }
66
67 protected void assertIsOpen() {
68 checkState(isOpen(), "The log is not currently open.");
69 }
70
71 @Override
72 public long appendEntry(Entry entry) {
73 checkArgument(entry != null, "expecting non-null entry");
74 return appendEntries(entry).get(0);
75 }
76
77 @Override
78 public List<Long> appendEntries(Entry... entries) {
79 checkArgument(entries != null, "expecting non-null entries");
80 return appendEntries(Arrays.asList(entries));
81 }
82
83 @Override
84 public List<Long> appendEntries(List<Entry> entries) {
85 assertIsOpen();
86 checkArgument(entries != null, "expecting non-null entries");
87 final List<Long> indices = Lists.newArrayList();
88
89 txMaker.execute(new TxBlock() {
90 @Override
91 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -080092 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -080093 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
94 long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
95 for (Entry entry : entries) {
96 byte[] entryBytes = serializer.encode(entry);
97 log.put(nextIndex, entryBytes);
98 size.addAndGet(entryBytes.length);
99 indices.add(nextIndex);
100 nextIndex++;
101 }
102 }
103 });
104
105 return indices;
106 }
107
108 @Override
109 public boolean containsEntry(long index) {
110 assertIsOpen();
111 DB db = txMaker.makeTx();
112 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800113 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800114 return log.containsKey(index);
115 } finally {
116 db.close();
117 }
118 }
119
120 @Override
121 public void delete() throws IOException {
122 assertIsOpen();
123 txMaker.execute(new TxBlock() {
124 @Override
125 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800126 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800127 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
128 log.clear();
129 size.set(0);
130 }
131 });
132 }
133
134 @Override
135 public <T extends Entry> T firstEntry() {
136 assertIsOpen();
137 DB db = txMaker.makeTx();
138 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800139 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800140 return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
141 } finally {
142 db.close();
143 }
144 }
145
146 @Override
147 public long firstIndex() {
148 assertIsOpen();
149 DB db = txMaker.makeTx();
150 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800151 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800152 return log.isEmpty() ? 0 : log.firstKey();
153 } finally {
154 db.close();
155 }
156 }
157
158 @Override
159 public <T extends Entry> List<T> getEntries(long from, long to) {
160 assertIsOpen();
161 DB db = txMaker.makeTx();
162 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800163 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800164 if (log.isEmpty()) {
165 throw new LogIndexOutOfBoundsException("Log is empty");
166 } else if (from < log.firstKey()) {
167 throw new LogIndexOutOfBoundsException("From index out of bounds.");
168 } else if (to > log.lastKey()) {
169 throw new LogIndexOutOfBoundsException("To index out of bounds.");
170 }
171 List<T> entries = new ArrayList<>((int) (to - from + 1));
172 for (long i = from; i <= to; i++) {
173 T entry = serializer.decode(log.get(i));
174 entries.add(entry);
175 }
176 return entries;
177 } finally {
178 db.close();
179 }
180 }
181
182 @Override
183 public <T extends Entry> T getEntry(long index) {
184 assertIsOpen();
185 DB db = txMaker.makeTx();
186 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800187 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800188 byte[] entryBytes = log.get(index);
189 return entryBytes == null ? null : serializer.decode(entryBytes);
190 } finally {
191 db.close();
192 }
193 }
194
195 @Override
196 public boolean isEmpty() {
197 assertIsOpen();
198 DB db = txMaker.makeTx();
199 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800200 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800201 return log.isEmpty();
202 } finally {
203 db.close();
204 }
205 }
206
207 @Override
208 public <T extends Entry> T lastEntry() {
209 assertIsOpen();
210 DB db = txMaker.makeTx();
211 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800212 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800213 return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
214 } finally {
215 db.close();
216 }
217 }
218
219 @Override
220 public long lastIndex() {
221 assertIsOpen();
222 DB db = txMaker.makeTx();
223 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800224 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800225 return log.isEmpty() ? 0 : log.lastKey();
226 } finally {
227 db.close();
228 }
229 }
230
231 @Override
232 public void removeAfter(long index) {
233 assertIsOpen();
234 txMaker.execute(new TxBlock() {
235 @Override
236 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800237 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800238 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
239 long startIndex = index + 1;
240 long endIndex = log.lastKey();
241 for (long i = startIndex; i <= endIndex; ++i) {
242 byte[] entryBytes = log.remove(i);
243 size.addAndGet(-1L * entryBytes.length);
244 }
245 }
246 });
247 }
248
249 @Override
250 public long size() {
251 assertIsOpen();
252 DB db = txMaker.makeTx();
253 try {
254 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
255 return size.get();
256 } finally {
257 db.close();
258 }
259 }
260
261 @Override
262 public void sync() throws IOException {
263 assertIsOpen();
264 }
265
266 @Override
267 public void compact(long index, Entry entry) throws IOException {
268
269 assertIsOpen();
270 txMaker.execute(new TxBlock() {
271 @Override
272 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800273 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800274 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
275 ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
276 long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
277 size.addAndGet(-1 * deletedBytes);
278 byte[] entryBytes = serializer.encode(entry);
279 byte[] existingEntry = log.put(index, entryBytes);
Madan Jampani348a9fe2014-11-09 01:37:51 -0800280 if (existingEntry != null) {
281 size.addAndGet(entryBytes.length - existingEntry.length);
282 } else {
283 size.addAndGet(entryBytes.length);
284 }
Madan Jampani778f7ad2014-11-05 22:46:15 -0800285 db.compact();
286 }
287 });
288 }
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800289
290 private BTreeMap<Long, byte[]> getLogMap(DB db) {
291 return db.createTreeMap(LOG_NAME)
292 .valuesOutsideNodesEnable()
293 .keySerializerWrap(Serializer.LONG)
294 .valueSerializer(Serializer.BYTE_ARRAY)
295 .makeOrGet();
296 }
Madan Jampani2ee20002014-11-06 20:06:12 -0800297}