Add persistence option to ECMap
Work towards ONOS-1337
Change-Id: I24e6a42e2f8856b363e79786829c51344797b81b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 99eef3d..33e4251 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -93,7 +93,6 @@
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
-
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
@@ -116,6 +115,9 @@
private static final int LOAD_WINDOW = 2;
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
+ private final boolean persistent;
+ private final PersistentStore<K, V> persistentStore;
+
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
@@ -140,9 +142,9 @@
* @param tombstonesDisabled true if this map should not maintain
* tombstones
* @param antiEntropyPeriod period that the anti-entropy task should run
- * in seconds
- * @param antiEntropyTimeUnit time unit for anti-entropy task scheduling
+ * @param antiEntropyTimeUnit time unit for anti-entropy period
* @param convergeFaster make anti-entropy try to converge faster
+ * @param persistent persist data to disk
*/
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
@@ -156,7 +158,8 @@
boolean tombstonesDisabled,
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
- boolean convergeFaster) {
+ boolean convergeFaster,
+ boolean persistent) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
senderPending = Maps.newConcurrentMap();
@@ -195,6 +198,21 @@
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
+ this.persistent = persistent;
+
+ if (this.persistent) {
+ String dataDirectory = System.getProperty("karaf.data", "./data");
+ String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
+
+ ExecutorService dbExecutor =
+ newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
+
+ persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
+ persistentStore.readInto(items, removedItems);
+ } else {
+ this.persistentStore = null;
+ }
+
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
} else {
@@ -232,6 +250,7 @@
.register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
+ .register(Timestamped.class)
.build();
}
};
@@ -321,6 +340,11 @@
if (success && removed != null) {
removedItems.remove(key, removed);
}
+
+ if (success && persistent) {
+ persistentStore.put(key, value, timestamp);
+ }
+
return success;
}
@@ -357,24 +381,29 @@
// remove from items map
return null;
}
- });
+ });
if (updated.isFalse()) {
return false;
}
+ boolean updatedTombstone = false;
+
if (!tombstonesDisabled) {
Timestamp removedTimestamp = removedItems.get(key);
if (removedTimestamp == null) {
- return removedItems.putIfAbsent(key, timestamp) == null;
+ //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
+ updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
} else if (timestamp.isNewerThan(removedTimestamp)) {
- return removedItems.replace(key, removedTimestamp, timestamp);
- } else {
- return false;
+ updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
}
}
- return updated.booleanValue();
+ if (updated.booleanValue() && persistent) {
+ persistentStore.remove(key, timestamp);
+ }
+
+ return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
}
@Override