Notification support for Consistent datastructures (ConsitentMap and DistributedSet)
Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 47031b0..a222627 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -42,19 +42,24 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+
+import static org.onlab.util.Tools.groupedThreads;
+
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapInfo;
import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.SetBuilder;
+import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Transaction;
@@ -69,6 +74,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -93,12 +99,16 @@
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
private ClusterCoordinator coordinator;
- private PartitionedDatabase partitionedDatabase;
- private Database inMemoryDatabase;
+ protected PartitionedDatabase partitionedDatabase;
+ protected Database inMemoryDatabase;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
+ private ExecutorService eventDispatcher;
+
+ private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -187,7 +197,10 @@
Futures.getUnchecked(status);
- transactionManager = new TransactionManager(partitionedDatabase);
+ transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
+
+ eventDispatcher = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/manager", "map-event-dispatcher"));
log.info("Started");
}
@@ -213,13 +226,14 @@
log.info("Successfully closed databases.");
}
});
+ maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
+ eventDispatcher.shutdown();
log.info("Stopped");
}
@Override
public TransactionContextBuilder transactionContextBuilder() {
- return new DefaultTransactionContextBuilder(
- inMemoryDatabase, partitionedDatabase, transactionIdGenerator.getNewId());
+ return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
}
@Override
@@ -296,12 +310,12 @@
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
- return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
+ return new DefaultConsistentMapBuilder<>(this);
}
@Override
- public <E> SetBuilder<E> setBuilder() {
- return new DefaultSetBuilder<>(partitionedDatabase);
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ return new DefaultDistributedSetBuilder<>(this);
}
@Override
@@ -370,4 +384,20 @@
public void redriveTransactions() {
getTransactions().stream().forEach(transactionManager::execute);
}
+
+ protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
+ // TODO: Support different local instances of the same map.
+ if (!maps.add(map)) {
+ throw new IllegalStateException("Map by name " + map.name() + " already exists");
+ }
+
+ clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
+ map.serializer()::decode,
+ map::notifyLocalListeners,
+ eventDispatcher);
+ }
+
+ protected static MessageSubject mapUpdatesSubject(String mapName) {
+ return new MessageSubject(mapName + "-map-updates");
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 3fb6f4a..4bc93bc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -17,6 +17,7 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.*;
+import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Map;
@@ -24,8 +25,10 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -36,8 +39,11 @@
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -56,6 +62,11 @@
private final Database database;
private final Serializer serializer;
private final boolean readOnly;
+ private final Consumer<MapEvent<K, V>> eventPublisher;
+
+ private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
+
+ private final Logger log = getLogger(getClass());
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
@@ -77,11 +88,29 @@
public DefaultAsyncConsistentMap(String name,
Database database,
Serializer serializer,
- boolean readOnly) {
+ boolean readOnly,
+ Consumer<MapEvent<K, V>> eventPublisher) {
this.name = checkNotNull(name, "map name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.readOnly = readOnly;
+ this.eventPublisher = eventPublisher;
+ }
+
+ /**
+ * Returns this map name.
+ * @return map name
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the serializer for map entries.
+ * @return map entry serializer
+ */
+ public Serializer serializer() {
+ return serializer;
}
@Override
@@ -139,6 +168,7 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(condition, "predicate function cannot be null");
checkNotNull(remappingFunction, "Remapping function cannot be null");
+ AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
return get(key).thenCompose(r1 -> {
V existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
@@ -160,6 +190,7 @@
if (r1 != null) {
return remove(key, r1.version()).thenApply(result -> {
if (result) {
+ mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
return null;
} else {
throw new ConsistentMapException.ConcurrentModification();
@@ -174,6 +205,7 @@
return replaceAndGet(key, r1.version(), computedValue.get())
.thenApply(v -> {
if (v.isPresent()) {
+ mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get()));
return v.get();
} else {
throw new ConsistentMapException.ConcurrentModification();
@@ -184,12 +216,13 @@
if (!result.isPresent()) {
throw new ConsistentMapException.ConcurrentModification();
} else {
+ mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
return result.get();
}
});
}
}
- });
+ }).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
}
@Override
@@ -370,4 +403,35 @@
throw new UnsupportedOperationException();
}
}
+
+ @Override
+ public void addListener(MapEventListener<K, V> listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(MapEventListener<K, V> listener) {
+ listeners.remove(listener);
+ }
+
+ protected void notifyListeners(MapEvent<K, V> event) {
+ try {
+ if (event != null) {
+ notifyLocalListeners(event);
+ notifyRemoteListeners(event);
+ }
+ } catch (Exception e) {
+ log.warn("Failure notifying listeners about {}", event, e);
+ }
+ }
+
+ protected void notifyLocalListeners(MapEvent<K, V> event) {
+ listeners.forEach(listener -> listener.event(event));
+ }
+
+ protected void notifyRemoteListeners(MapEvent<K, V> event) {
+ if (eventPublisher != null) {
+ eventPublisher.accept(event);
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index b85dfa2..7995d8f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,10 +28,9 @@
import java.util.function.Predicate;
import java.util.Set;
-import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
/**
@@ -45,13 +44,14 @@
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
- private final AsyncConsistentMap<K, V> asyncMap;
+ private final DefaultAsyncConsistentMap<K, V> asyncMap;
- public DefaultConsistentMap(String name,
- Database database,
- Serializer serializer,
- boolean readOnly) {
- asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer, readOnly);
+ public String name() {
+ return asyncMap.name();
+ }
+
+ public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+ this.asyncMap = asyncMap;
}
@Override
@@ -190,4 +190,14 @@
}
}
}
+
+ @Override
+ public void addListener(MapEventListener<K, V> listener) {
+ asyncMap.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(MapEventListener<K, V> listener) {
+ asyncMap.addListener(listener);
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
index 534b59f..24db74f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -6,6 +6,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
/**
@@ -20,12 +21,10 @@
private String name;
private boolean partitionsEnabled = true;
private boolean readOnly = false;
- private final Database partitionedDatabase;
- private final Database inMemoryDatabase;
+ private final DatabaseManager manager;
- public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
- this.inMemoryDatabase = inMemoryDatabase;
- this.partitionedDatabase = partitionedDatabase;
+ public DefaultConsistentMapBuilder(DatabaseManager manager) {
+ this.manager = manager;
}
@Override
@@ -60,21 +59,25 @@
@Override
public ConsistentMap<K, V> build() {
- checkState(validInputs());
- return new DefaultConsistentMap<>(
- name,
- partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
- serializer,
- readOnly);
+ return new DefaultConsistentMap<>(buildAndRegisterMap());
}
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
+ return buildAndRegisterMap();
+ }
+
+ private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
checkState(validInputs());
- return new DefaultAsyncConsistentMap<>(
+ DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
name,
- partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+ partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
serializer,
- readOnly);
+ readOnly,
+ event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
+ DatabaseManager.mapUpdatesSubject(name),
+ serializer::encode));
+ manager.registerMap(asyncMap);
+ return asyncMap;
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index adc9dcd..ba5ff95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -17,11 +17,17 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -29,12 +35,15 @@
* @param <E> set element type
*/
-public class DefaultDistributedSet<E> implements Set<E> {
+public class DefaultDistributedSet<E> implements DistributedSet<E> {
+ private final String name;
private final ConsistentMap<E, Boolean> backingMap;
+ private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
- public DefaultDistributedSet(String name, Database database, Serializer serializer, boolean readOnly) {
- backingMap = new DefaultConsistentMap<>(name, database, serializer, readOnly);
+ public DefaultDistributedSet(String name, ConsistentMap<E, Boolean> backingMap) {
+ this.name = name;
+ this.backingMap = backingMap;
}
@Override
@@ -76,7 +85,7 @@
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
- return backingMap.remove((E) o, true);
+ return backingMap.remove((E) o) != null;
}
@Override
@@ -119,4 +128,26 @@
public void clear() {
backingMap.clear();
}
+
+ @Override
+ public void addListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+ if (mapEvent.type() == MapEvent.Type.INSERT) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+ } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+ }
+ };
+ if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+ backingMap.addListener(mapEventListener);
+ }
+ }
+
+ @Override
+ public void removeListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+ if (mapEventListener != null) {
+ backingMap.removeListener(mapEventListener);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
new file mode 100644
index 0000000..57ec232
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.DistributedSetBuilder;
+
+/**
+ * Default distributed set builder.
+ *
+ * @param <E> type for set elements
+ */
+public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
+
+ private String name;
+ private ConsistentMapBuilder<E, Boolean> mapBuilder;
+
+ public DefaultDistributedSetBuilder(DatabaseManager manager) {
+ this.mapBuilder = manager.consistentMapBuilder();
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withName(String name) {
+ mapBuilder.withName(name);
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
+ mapBuilder.withSerializer(serializer);
+ return this;
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withUpdatesDisabled() {
+ mapBuilder.withUpdatesDisabled();
+ return this;
+ }
+
+ @Override
+ public DistributedSetBuilder<E> withPartitionsDisabled() {
+ mapBuilder.withPartitionsDisabled();
+ return this;
+ }
+
+ @Override
+ public DistributedSet<E> build() {
+ return new DefaultDistributedSet<E>(name, mapBuilder.build());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java
deleted file mode 100644
index 566953f..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultSetBuilder.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.Set;
-
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.SetBuilder;
-
-/**
- * Default Set builder.
- *
- * @param <E> type for set elements
- */
-public class DefaultSetBuilder<E> implements SetBuilder<E> {
-
- private Serializer serializer;
- private String name;
- private final Database database;
- private boolean readOnly;
-
- public DefaultSetBuilder(Database database) {
- this.database = checkNotNull(database);
- }
-
- @Override
- public SetBuilder<E> withName(String name) {
- checkArgument(name != null && !name.isEmpty());
- this.name = name;
- return this;
- }
-
- @Override
- public SetBuilder<E> withSerializer(Serializer serializer) {
- checkArgument(serializer != null);
- this.serializer = serializer;
- return this;
- }
-
- @Override
- public SetBuilder<E> withUpdatesDisabled() {
- readOnly = true;
- return this;
- }
-
- private boolean validInputs() {
- return name != null && serializer != null;
- }
-
- @Override
- public Set<E> build() {
- checkState(validInputs());
- return new DefaultDistributedSet<>(name, database, serializer, readOnly);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index 5570e30..b0ab575 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -18,9 +18,11 @@
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
@@ -41,10 +43,14 @@
private boolean isOpen = false;
private final Database database;
private final long transactionId;
+ private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
- public DefaultTransactionContext(Database database, long transactionId) {
- this.database = checkNotNull(database);
+ public DefaultTransactionContext(long transactionId,
+ Database database,
+ Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
this.transactionId = transactionId;
+ this.database = checkNotNull(database);
+ this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
}
@Override
@@ -72,7 +78,7 @@
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
- new DefaultConsistentMap<>(name, database, serializer, false),
+ mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
this,
serializer));
}
@@ -85,6 +91,7 @@
List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values()
.forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
+ // FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
} catch (Exception e) {
abort();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
index a3484cf..16e22aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContextBuilder.java
@@ -10,14 +10,11 @@
public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
private boolean partitionsEnabled = true;
- private final Database partitionedDatabase;
- private final Database inMemoryDatabase;
+ private final DatabaseManager manager;
private final long transactionId;
- public DefaultTransactionContextBuilder(
- Database inMemoryDatabase, Database partitionedDatabase, long transactionId) {
- this.partitionedDatabase = partitionedDatabase;
- this.inMemoryDatabase = inMemoryDatabase;
+ public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
+ this.manager = manager;
this.transactionId = transactionId;
}
@@ -30,8 +27,9 @@
@Override
public TransactionContext build() {
return new DefaultTransactionContext(
- partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
- transactionId);
+ transactionId,
+ partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
+ () -> partitionsEnabled ? manager.consistentMapBuilder()
+ : manager.consistentMapBuilder().withPartitionsDisabled());
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 2903e0a..7b279dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -50,6 +50,7 @@
private final List<Database> partitions;
private final AtomicBoolean isOpen = new AtomicBoolean(false);
private static final String DB_NOT_OPEN = "Partitioned Database is not open";
+ private TransactionManager transactionManager;
public PartitionedDatabase(
String name,
@@ -285,7 +286,10 @@
subTransactions.entrySet().iterator().next();
return entry.getKey().prepareAndCommit(entry.getValue());
} else {
- return new TransactionManager(this).execute(transaction);
+ if (transactionManager != null) {
+ throw new IllegalStateException("TransactionManager is not initialized");
+ }
+ return transactionManager.execute(transaction);
}
}
@@ -387,4 +391,8 @@
perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
return subTransactions;
}
+
+ protected void setTransactionManager(TransactionManager tranasactionManager) {
+ this.transactionManager = transactionManager;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
index d8e9930..245ae1a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/TransactionManager.java
@@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -26,6 +27,7 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
@@ -49,7 +51,7 @@
.register(ImmutablePair.class)
.build();
- private final Serializer serializer = Serializer.using(KRYO_NAMESPACE);
+ private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
private final Database database;
private final AsyncConsistentMap<Long, Transaction> transactions;
@@ -58,9 +60,11 @@
*
* @param database database
*/
- public TransactionManager(Database database) {
+ public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
this.database = checkNotNull(database, "database cannot be null");
- this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer, false);
+ this.transactions = mapBuilder.withName("onos-transactions")
+ .withSerializer(serializer)
+ .buildAsyncMap();
}
/**