Add persistence option to ECMap
Work towards ONOS-1337
Change-Id: I24e6a42e2f8856b363e79786829c51344797b81b
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
index 779c329..75ac27d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
@@ -166,6 +166,16 @@
public EventuallyConsistentMapBuilder<K, V> withFasterConvergence();
/**
+ * Configure the map to persist data to disk.
+ * <p>
+ * The default behavior is no persistence
+ * </p>
+ *
+ * @return this EventuallyConsistentMapBuilder
+ */
+ public EventuallyConsistentMapBuilder<K, V> withPersistence();
+
+ /**
* Builds an eventually consistent map based on the configuration options
* supplied to this builder.
*
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 2c9e4a9..7384988 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -65,7 +65,7 @@
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
- <version>1.0.6</version>
+ <version>1.0.7</version>
</dependency>
<dependency>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index 9d60143..23b219b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -51,6 +51,7 @@
private long antiEntropyPeriod = 5;
private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
private boolean convergeFaster = false;
+ private boolean persistent = false;
/**
* Creates a new eventually consistent map builder.
@@ -131,6 +132,12 @@
}
@Override
+ public EventuallyConsistentMapBuilder<K, V> withPersistence() {
+ persistent = true;
+ return this;
+ }
+
+ @Override
public EventuallyConsistentMap<K, V> build() {
checkNotNull(name, "name is a mandatory parameter");
checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
@@ -148,6 +155,7 @@
tombstonesDisabled,
antiEntropyPeriod,
antiEntropyTimeUnit,
- convergeFaster);
+ convergeFaster,
+ persistent);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 99eef3d..33e4251 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -93,7 +93,6 @@
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
-
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
@@ -116,6 +115,9 @@
private static final int LOAD_WINDOW = 2;
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
+ private final boolean persistent;
+ private final PersistentStore<K, V> persistentStore;
+
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
@@ -140,9 +142,9 @@
* @param tombstonesDisabled true if this map should not maintain
* tombstones
* @param antiEntropyPeriod period that the anti-entropy task should run
- * in seconds
- * @param antiEntropyTimeUnit time unit for anti-entropy task scheduling
+ * @param antiEntropyTimeUnit time unit for anti-entropy period
* @param convergeFaster make anti-entropy try to converge faster
+ * @param persistent persist data to disk
*/
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
@@ -156,7 +158,8 @@
boolean tombstonesDisabled,
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
- boolean convergeFaster) {
+ boolean convergeFaster,
+ boolean persistent) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
senderPending = Maps.newConcurrentMap();
@@ -195,6 +198,21 @@
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
+ this.persistent = persistent;
+
+ if (this.persistent) {
+ String dataDirectory = System.getProperty("karaf.data", "./data");
+ String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
+
+ ExecutorService dbExecutor =
+ newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
+
+ persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
+ persistentStore.readInto(items, removedItems);
+ } else {
+ this.persistentStore = null;
+ }
+
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
} else {
@@ -232,6 +250,7 @@
.register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
+ .register(Timestamped.class)
.build();
}
};
@@ -321,6 +340,11 @@
if (success && removed != null) {
removedItems.remove(key, removed);
}
+
+ if (success && persistent) {
+ persistentStore.put(key, value, timestamp);
+ }
+
return success;
}
@@ -357,24 +381,29 @@
// remove from items map
return null;
}
- });
+ });
if (updated.isFalse()) {
return false;
}
+ boolean updatedTombstone = false;
+
if (!tombstonesDisabled) {
Timestamp removedTimestamp = removedItems.get(key);
if (removedTimestamp == null) {
- return removedItems.putIfAbsent(key, timestamp) == null;
+ //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
+ updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
} else if (timestamp.isNewerThan(removedTimestamp)) {
- return removedItems.replace(key, removedTimestamp, timestamp);
- } else {
- return false;
+ updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
}
}
- return updated.booleanValue();
+ if (updated.booleanValue() && persistent) {
+ persistentStore.remove(key, timestamp);
+ }
+
+ return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
new file mode 100644
index 0000000..f803bb8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.ecmap;
+
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Hasher;
+import org.mapdb.Serializer;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.impl.Timestamped;
+import org.onosproject.store.serializers.KryoSerializer;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * MapDB based implementation of a persistent store.
+ */
+class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
+
+ private final ExecutorService executor;
+ private final KryoSerializer serializer;
+
+ private final DB database;
+
+ private final Map<byte[], byte[]> items;
+ private final Map<byte[], byte[]> tombstones;
+
+ /**
+ * Creates a new MapDB based persistent store.
+ *
+ * @param filename filename of the database on disk
+ * @param executor executor to use for tasks that write to the disk
+ * @param serializer serializer for keys and values
+ */
+ MapDbPersistentStore(String filename, ExecutorService executor,
+ KryoSerializer serializer) {
+ this.executor = checkNotNull(executor);
+ this.serializer = checkNotNull(serializer);
+
+ File databaseFile = new File(filename);
+
+ database = DBMaker.newFileDB(databaseFile).make();
+
+ items = database.createHashMap("items")
+ .keySerializer(Serializer.BYTE_ARRAY)
+ .valueSerializer(Serializer.BYTE_ARRAY)
+ .hasher(Hasher.BYTE_ARRAY)
+ .makeOrGet();
+
+ tombstones = database.createHashMap("tombstones")
+ .keySerializer(Serializer.BYTE_ARRAY)
+ .valueSerializer(Serializer.BYTE_ARRAY)
+ .hasher(Hasher.BYTE_ARRAY)
+ .makeOrGet();
+ }
+
+ @Override
+ public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) {
+ this.items.forEach((keyBytes, valueBytes) ->
+ items.put(serializer.decode(keyBytes),
+ serializer.decode(valueBytes)));
+
+ this.tombstones.forEach((keyBytes, valueBytes) ->
+ tombstones.put(serializer.decode(keyBytes),
+ serializer.decode(valueBytes)));
+ }
+
+ @Override
+ public void put(K key, V value, Timestamp timestamp) {
+ executor.submit(() -> putInternal(key, value, timestamp));
+ }
+
+ private void putInternal(K key, V value, Timestamp timestamp) {
+ byte[] keyBytes = serializer.encode(key);
+ byte[] removedBytes = tombstones.get(keyBytes);
+
+ Timestamp removed = removedBytes == null ? null :
+ serializer.decode(removedBytes);
+ if (removed != null && removed.isNewerThan(timestamp)) {
+ return;
+ }
+
+ final MutableBoolean updated = new MutableBoolean(false);
+
+ items.compute(keyBytes, (k, existingBytes) -> {
+ Timestamped<V> existing = existingBytes == null ? null :
+ serializer.decode(existingBytes);
+ if (existing != null && existing.isNewerThan(timestamp)) {
+ updated.setFalse();
+ return existingBytes;
+ } else {
+ updated.setTrue();
+ return serializer.encode(new Timestamped<>(value, timestamp));
+ }
+ });
+
+ boolean success = updated.booleanValue();
+
+ if (success && removed != null) {
+ tombstones.remove(keyBytes, removedBytes);
+ }
+
+ database.commit();
+ }
+
+ @Override
+ public void remove(K key, Timestamp timestamp) {
+ executor.submit(() -> removeInternal(key, timestamp));
+ }
+
+ private void removeInternal(K key, Timestamp timestamp) {
+ byte[] keyBytes = serializer.encode(key);
+
+ final MutableBoolean updated = new MutableBoolean(false);
+
+ items.compute(keyBytes, (k, existingBytes) -> {
+ Timestamp existing = existingBytes == null ? null :
+ serializer.decode(existingBytes);
+ if (existing != null && existing.isNewerThan(timestamp)) {
+ updated.setFalse();
+ return existingBytes;
+ } else {
+ updated.setTrue();
+ // remove from items map
+ return null;
+ }
+ });
+
+ if (!updated.booleanValue()) {
+ return;
+ }
+
+ byte[] timestampBytes = serializer.encode(timestamp);
+ byte[] removedBytes = tombstones.get(keyBytes);
+
+ Timestamp removedTimestamp = removedBytes == null ? null :
+ serializer.decode(removedBytes);
+ if (removedTimestamp == null) {
+ tombstones.putIfAbsent(keyBytes, timestampBytes);
+ } else if (timestamp.isNewerThan(removedTimestamp)) {
+ tombstones.replace(keyBytes, removedBytes, timestampBytes);
+ }
+
+ database.commit();
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
new file mode 100644
index 0000000..b945f93
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.ecmap;
+
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.impl.Timestamped;
+
+import java.util.Map;
+
+/**
+ * A persistent store for an eventually consistent map.
+ */
+interface PersistentStore<K, V> {
+
+ /**
+ * Read the contents of the disk into the given maps.
+ *
+ * @param items items map
+ * @param tombstones tombstones map
+ */
+ void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones);
+
+ /**
+ * Puts a new key,value pair into the map on disk.
+ *
+ * @param key the key
+ * @param value the value
+ * @param timestamp the timestamp of the update
+ */
+ void put(K key, V value, Timestamp timestamp);
+
+ /**
+ * Removes a key from the map on disk.
+ *
+ * @param key the key
+ * @param timestamp the timestamp of the update
+ */
+ void remove(K key, Timestamp timestamp);
+}
diff --git a/features/features.xml b/features/features.xml
index a345962..a132cf2 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -64,7 +64,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>mvn:org.onosproject/onlab-thirdparty/@ONOS-VERSION</bundle>
- <bundle>mvn:org.mapdb/mapdb/1.0.6</bundle>
+ <bundle>mvn:org.mapdb/mapdb/1.0.7</bundle>
</feature>
<feature name="onos-thirdparty-web" version="@FEATURE-VERSION"