blob: 1859d996a0e027410a8c6ad17a67284d216bb874 [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani778f7ad2014-11-05 22:46:15 -080017
18import static com.google.common.base.Preconditions.checkArgument;
19import static com.google.common.base.Preconditions.checkState;
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -080020import static com.google.common.base.Verify.verifyNotNull;
Madan Jampani348a9fe2014-11-09 01:37:51 -080021import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani778f7ad2014-11-05 22:46:15 -080022
23import java.io.File;
24import java.io.IOException;
25import java.util.ArrayList;
26import java.util.Arrays;
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -080027import java.util.Iterator;
Madan Jampani778f7ad2014-11-05 22:46:15 -080028import java.util.List;
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -080029import java.util.Map;
Madan Jampani778f7ad2014-11-05 22:46:15 -080030import java.util.concurrent.ConcurrentNavigableMap;
31
32import net.kuujo.copycat.log.Entry;
33import net.kuujo.copycat.log.Log;
34import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
35
36import org.mapdb.Atomic;
37import org.mapdb.BTreeMap;
38import org.mapdb.DB;
39import org.mapdb.DBMaker;
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -080040import org.mapdb.Serializer;
Madan Jampani778f7ad2014-11-05 22:46:15 -080041import org.mapdb.TxBlock;
42import org.mapdb.TxMaker;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.store.serializers.StoreSerializer;
Madan Jampani348a9fe2014-11-09 01:37:51 -080044import org.slf4j.Logger;
Madan Jampani778f7ad2014-11-05 22:46:15 -080045
Madan Jampani778f7ad2014-11-05 22:46:15 -080046/**
47 * MapDB based log implementation.
48 */
49public class MapDBLog implements Log {
50
Madan Jampani348a9fe2014-11-09 01:37:51 -080051 private final Logger log = getLogger(getClass());
52
Madan Jampani778f7ad2014-11-05 22:46:15 -080053 private final File dbFile;
54 private TxMaker txMaker;
55 private final StoreSerializer serializer;
56 private static final String LOG_NAME = "log";
57 private static final String SIZE_FIELD_NAME = "size";
58
Yuta HIGUCHI413573d2014-11-10 09:45:18 -080059 private int cacheSize = 256;
60
Madan Jampani2ee20002014-11-06 20:06:12 -080061 public MapDBLog(String dbFileName, StoreSerializer serializer) {
62 this.dbFile = new File(dbFileName);
Madan Jampani778f7ad2014-11-05 22:46:15 -080063 this.serializer = serializer;
64 }
65
66 @Override
67 public void open() throws IOException {
68 txMaker = DBMaker
69 .newFileDB(dbFile)
Madan Jampani13efefa2014-11-10 12:23:09 -080070 .mmapFileEnableIfSupported()
Yuta HIGUCHI413573d2014-11-10 09:45:18 -080071 .cacheSize(cacheSize)
Madan Jampani778f7ad2014-11-05 22:46:15 -080072 .makeTxMaker();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080073 log.info("Raft log file: {}", dbFile.getCanonicalPath());
Madan Jampani778f7ad2014-11-05 22:46:15 -080074 }
75
76 @Override
77 public void close() throws IOException {
78 assertIsOpen();
79 txMaker.close();
80 txMaker = null;
81 }
82
83 @Override
84 public boolean isOpen() {
85 return txMaker != null;
86 }
87
88 protected void assertIsOpen() {
89 checkState(isOpen(), "The log is not currently open.");
90 }
91
92 @Override
93 public long appendEntry(Entry entry) {
94 checkArgument(entry != null, "expecting non-null entry");
95 return appendEntries(entry).get(0);
96 }
97
98 @Override
99 public List<Long> appendEntries(Entry... entries) {
100 checkArgument(entries != null, "expecting non-null entries");
101 return appendEntries(Arrays.asList(entries));
102 }
103
104 @Override
Madan Jampania88d1f52014-11-14 16:45:24 -0800105 public synchronized List<Long> appendEntries(List<Entry> entries) {
Madan Jampani778f7ad2014-11-05 22:46:15 -0800106 assertIsOpen();
107 checkArgument(entries != null, "expecting non-null entries");
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800108 final List<Long> indices = new ArrayList<>(entries.size());
Madan Jampani778f7ad2014-11-05 22:46:15 -0800109
110 txMaker.execute(new TxBlock() {
111 @Override
112 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800113 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800114 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
115 long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800116 long addedBytes = 0;
Madan Jampani778f7ad2014-11-05 22:46:15 -0800117 for (Entry entry : entries) {
Yuta HIGUCHIa38d9952014-11-10 09:46:36 -0800118 byte[] entryBytes = verifyNotNull(serializer.encode(entry),
119 "Writing LogEntry %s failed", nextIndex);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800120 log.put(nextIndex, entryBytes);
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800121 addedBytes += entryBytes.length;
Madan Jampani778f7ad2014-11-05 22:46:15 -0800122 indices.add(nextIndex);
123 nextIndex++;
124 }
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800125 size.addAndGet(addedBytes);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800126 }
127 });
128
129 return indices;
130 }
131
132 @Override
133 public boolean containsEntry(long index) {
134 assertIsOpen();
135 DB db = txMaker.makeTx();
136 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800137 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800138 return log.containsKey(index);
139 } finally {
140 db.close();
141 }
142 }
143
144 @Override
145 public void delete() throws IOException {
146 assertIsOpen();
147 txMaker.execute(new TxBlock() {
148 @Override
149 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800150 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800151 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
152 log.clear();
153 size.set(0);
154 }
155 });
156 }
157
158 @Override
159 public <T extends Entry> T firstEntry() {
160 assertIsOpen();
161 DB db = txMaker.makeTx();
162 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800163 BTreeMap<Long, byte[]> log = getLogMap(db);
Yuta HIGUCHI1ec41662014-11-19 22:01:54 -0800164 return log.isEmpty() ? null : verifyNotNull(decodeEntry(log.firstEntry().getValue()));
Madan Jampani778f7ad2014-11-05 22:46:15 -0800165 } finally {
166 db.close();
167 }
168 }
169
170 @Override
171 public long firstIndex() {
172 assertIsOpen();
173 DB db = txMaker.makeTx();
174 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800175 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800176 return log.isEmpty() ? 0 : log.firstKey();
177 } finally {
178 db.close();
179 }
180 }
181
Yuta HIGUCHI1ec41662014-11-19 22:01:54 -0800182 private <T extends Entry> T decodeEntry(final byte[] bytes) {
183 if (bytes == null) {
184 return null;
185 }
186 return serializer.decode(bytes.clone());
187 }
188
Madan Jampani778f7ad2014-11-05 22:46:15 -0800189 @Override
190 public <T extends Entry> List<T> getEntries(long from, long to) {
191 assertIsOpen();
192 DB db = txMaker.makeTx();
193 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800194 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800195 if (log.isEmpty()) {
196 throw new LogIndexOutOfBoundsException("Log is empty");
197 } else if (from < log.firstKey()) {
198 throw new LogIndexOutOfBoundsException("From index out of bounds.");
199 } else if (to > log.lastKey()) {
200 throw new LogIndexOutOfBoundsException("To index out of bounds.");
201 }
202 List<T> entries = new ArrayList<>((int) (to - from + 1));
203 for (long i = from; i <= to; i++) {
Yuta HIGUCHI1ec41662014-11-19 22:01:54 -0800204 T entry = verifyNotNull(decodeEntry(log.get(i)), "LogEntry %s was null", i);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800205 entries.add(entry);
206 }
207 return entries;
208 } finally {
209 db.close();
210 }
211 }
212
213 @Override
214 public <T extends Entry> T getEntry(long index) {
215 assertIsOpen();
216 DB db = txMaker.makeTx();
217 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800218 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800219 byte[] entryBytes = log.get(index);
Yuta HIGUCHI1ec41662014-11-19 22:01:54 -0800220 return entryBytes == null ? null : verifyNotNull(decodeEntry(entryBytes),
Yuta HIGUCHIa38d9952014-11-10 09:46:36 -0800221 "LogEntry %s was null", index);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800222 } finally {
223 db.close();
224 }
225 }
226
227 @Override
228 public boolean isEmpty() {
229 assertIsOpen();
230 DB db = txMaker.makeTx();
231 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800232 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800233 return log.isEmpty();
234 } finally {
235 db.close();
236 }
237 }
238
239 @Override
240 public <T extends Entry> T lastEntry() {
241 assertIsOpen();
242 DB db = txMaker.makeTx();
243 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800244 BTreeMap<Long, byte[]> log = getLogMap(db);
Yuta HIGUCHI1ec41662014-11-19 22:01:54 -0800245 return log.isEmpty() ? null : verifyNotNull(decodeEntry(log.lastEntry().getValue()));
Madan Jampani778f7ad2014-11-05 22:46:15 -0800246 } finally {
247 db.close();
248 }
249 }
250
251 @Override
252 public long lastIndex() {
253 assertIsOpen();
254 DB db = txMaker.makeTx();
255 try {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800256 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800257 return log.isEmpty() ? 0 : log.lastKey();
258 } finally {
259 db.close();
260 }
261 }
262
263 @Override
264 public void removeAfter(long index) {
265 assertIsOpen();
266 txMaker.execute(new TxBlock() {
267 @Override
268 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800269 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800270 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800271 long removedBytes = 0;
272 ConcurrentNavigableMap<Long, byte[]> tailMap = log.tailMap(index, false);
273 Iterator<Map.Entry<Long, byte[]>> it = tailMap.entrySet().iterator();
274 while (it.hasNext()) {
275 Map.Entry<Long, byte[]> entry = it.next();
276 removedBytes += entry.getValue().length;
277 it.remove();
Madan Jampani778f7ad2014-11-05 22:46:15 -0800278 }
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800279 size.addAndGet(-removedBytes);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800280 }
281 });
282 }
283
284 @Override
285 public long size() {
286 assertIsOpen();
287 DB db = txMaker.makeTx();
288 try {
289 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
290 return size.get();
291 } finally {
292 db.close();
293 }
294 }
295
296 @Override
297 public void sync() throws IOException {
298 assertIsOpen();
299 }
300
301 @Override
302 public void compact(long index, Entry entry) throws IOException {
303
304 assertIsOpen();
305 txMaker.execute(new TxBlock() {
306 @Override
307 public void tx(DB db) {
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800308 BTreeMap<Long, byte[]> log = getLogMap(db);
Madan Jampani778f7ad2014-11-05 22:46:15 -0800309 Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
310 ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
Yuta HIGUCHIebaa1572014-11-09 19:14:31 -0800311 Iterator<Map.Entry<Long, byte[]>> it = headMap.entrySet().iterator();
312
313 long deletedBytes = 0;
314 while (it.hasNext()) {
315 Map.Entry<Long, byte[]> e = it.next();
316 deletedBytes += e.getValue().length;
317 it.remove();
318 }
319 size.addAndGet(-deletedBytes);
320 byte[] entryBytes = verifyNotNull(serializer.encode(entry));
Madan Jampani778f7ad2014-11-05 22:46:15 -0800321 byte[] existingEntry = log.put(index, entryBytes);
Madan Jampani348a9fe2014-11-09 01:37:51 -0800322 if (existingEntry != null) {
323 size.addAndGet(entryBytes.length - existingEntry.length);
324 } else {
325 size.addAndGet(entryBytes.length);
326 }
Madan Jampani778f7ad2014-11-05 22:46:15 -0800327 db.compact();
328 }
329 });
330 }
Yuta HIGUCHI94ecb892014-11-06 23:31:44 -0800331
332 private BTreeMap<Long, byte[]> getLogMap(DB db) {
333 return db.createTreeMap(LOG_NAME)
334 .valuesOutsideNodesEnable()
335 .keySerializerWrap(Serializer.LONG)
336 .valueSerializer(Serializer.BYTE_ARRAY)
337 .makeOrGet();
338 }
Madan Jampani2ee20002014-11-06 20:06:12 -0800339}