blob: f803bb8306b2db7a91f6ab2ef2bdef7e8ec7c82b [file] [log] [blame]
Jonathan Hartca335e92015-03-05 10:34:32 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.store.ecmap;
18
19import org.apache.commons.lang3.mutable.MutableBoolean;
20import org.mapdb.DB;
21import org.mapdb.DBMaker;
22import org.mapdb.Hasher;
23import org.mapdb.Serializer;
24import org.onosproject.store.Timestamp;
25import org.onosproject.store.impl.Timestamped;
26import org.onosproject.store.serializers.KryoSerializer;
27
28import java.io.File;
29import java.util.Map;
30import java.util.concurrent.ExecutorService;
31
32import static com.google.common.base.Preconditions.checkNotNull;
33
34/**
35 * MapDB based implementation of a persistent store.
36 */
37class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
38
39 private final ExecutorService executor;
40 private final KryoSerializer serializer;
41
42 private final DB database;
43
44 private final Map<byte[], byte[]> items;
45 private final Map<byte[], byte[]> tombstones;
46
47 /**
48 * Creates a new MapDB based persistent store.
49 *
50 * @param filename filename of the database on disk
51 * @param executor executor to use for tasks that write to the disk
52 * @param serializer serializer for keys and values
53 */
54 MapDbPersistentStore(String filename, ExecutorService executor,
55 KryoSerializer serializer) {
56 this.executor = checkNotNull(executor);
57 this.serializer = checkNotNull(serializer);
58
59 File databaseFile = new File(filename);
60
61 database = DBMaker.newFileDB(databaseFile).make();
62
63 items = database.createHashMap("items")
64 .keySerializer(Serializer.BYTE_ARRAY)
65 .valueSerializer(Serializer.BYTE_ARRAY)
66 .hasher(Hasher.BYTE_ARRAY)
67 .makeOrGet();
68
69 tombstones = database.createHashMap("tombstones")
70 .keySerializer(Serializer.BYTE_ARRAY)
71 .valueSerializer(Serializer.BYTE_ARRAY)
72 .hasher(Hasher.BYTE_ARRAY)
73 .makeOrGet();
74 }
75
76 @Override
77 public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) {
78 this.items.forEach((keyBytes, valueBytes) ->
79 items.put(serializer.decode(keyBytes),
80 serializer.decode(valueBytes)));
81
82 this.tombstones.forEach((keyBytes, valueBytes) ->
83 tombstones.put(serializer.decode(keyBytes),
84 serializer.decode(valueBytes)));
85 }
86
87 @Override
88 public void put(K key, V value, Timestamp timestamp) {
89 executor.submit(() -> putInternal(key, value, timestamp));
90 }
91
92 private void putInternal(K key, V value, Timestamp timestamp) {
93 byte[] keyBytes = serializer.encode(key);
94 byte[] removedBytes = tombstones.get(keyBytes);
95
96 Timestamp removed = removedBytes == null ? null :
97 serializer.decode(removedBytes);
98 if (removed != null && removed.isNewerThan(timestamp)) {
99 return;
100 }
101
102 final MutableBoolean updated = new MutableBoolean(false);
103
104 items.compute(keyBytes, (k, existingBytes) -> {
105 Timestamped<V> existing = existingBytes == null ? null :
106 serializer.decode(existingBytes);
107 if (existing != null && existing.isNewerThan(timestamp)) {
108 updated.setFalse();
109 return existingBytes;
110 } else {
111 updated.setTrue();
112 return serializer.encode(new Timestamped<>(value, timestamp));
113 }
114 });
115
116 boolean success = updated.booleanValue();
117
118 if (success && removed != null) {
119 tombstones.remove(keyBytes, removedBytes);
120 }
121
122 database.commit();
123 }
124
125 @Override
126 public void remove(K key, Timestamp timestamp) {
127 executor.submit(() -> removeInternal(key, timestamp));
128 }
129
130 private void removeInternal(K key, Timestamp timestamp) {
131 byte[] keyBytes = serializer.encode(key);
132
133 final MutableBoolean updated = new MutableBoolean(false);
134
135 items.compute(keyBytes, (k, existingBytes) -> {
136 Timestamp existing = existingBytes == null ? null :
137 serializer.decode(existingBytes);
138 if (existing != null && existing.isNewerThan(timestamp)) {
139 updated.setFalse();
140 return existingBytes;
141 } else {
142 updated.setTrue();
143 // remove from items map
144 return null;
145 }
146 });
147
148 if (!updated.booleanValue()) {
149 return;
150 }
151
152 byte[] timestampBytes = serializer.encode(timestamp);
153 byte[] removedBytes = tombstones.get(keyBytes);
154
155 Timestamp removedTimestamp = removedBytes == null ? null :
156 serializer.decode(removedBytes);
157 if (removedTimestamp == null) {
158 tombstones.putIfAbsent(keyBytes, timestampBytes);
159 } else if (timestamp.isNewerThan(removedTimestamp)) {
160 tombstones.replace(keyBytes, removedBytes, timestampBytes);
161 }
162
163 database.commit();
164 }
165
166}