Removed deprecated map change notification support
Change-Id: Ibff2e403129ee026092a24fc15b82e80ffb8dc48
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index d04f5c7..8cb587d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -20,9 +20,9 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
@@ -68,7 +68,6 @@
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapInfo;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.DistributedSetBuilder;
@@ -126,8 +125,8 @@
private ExecutorService queuePollExecutor;
private ApplicationListener appListener = new InternalApplicationListener();
- private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
- private final ListMultimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create();
+ private final Multimap<String, DefaultAsyncConsistentMap> maps = ArrayListMultimap.create();
+ private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create();
private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -451,28 +450,15 @@
}
protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) {
- DefaultAsyncConsistentMap<K, V> existing = maps.putIfAbsent(map.name(), map);
- if (existing != null) {
- // FIXME: We need to cleanly support different map instances with same name.
- log.info("Map by name {} already exists", map.name());
- return existing;
- } else {
- if (map.applicationId() != null) {
- mapsByApplication.put(map.applicationId(), map);
- }
+ maps.put(map.name(), map);
+ if (map.applicationId() != null) {
+ mapsByApplication.put(map.applicationId(), map);
}
-
- clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
- map.serializer()::decode,
- map::notifyLocalListeners,
- eventDispatcher);
return map;
}
protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
- if (maps.remove(map.name()) != null) {
- clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
- }
+ maps.remove(map.name(), map);
if (map.applicationId() != null) {
mapsByApplication.remove(map.applicationId(), map);
}
@@ -485,10 +471,6 @@
}
}
- protected static MessageSubject mapUpdatesSubject(String mapName) {
- return new MessageSubject(mapName + "-map-updates");
- }
-
private class InternalApplicationListener implements ApplicationListener {
@Override
public void event(ApplicationEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 3fbdd2f..3d73d84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -28,7 +28,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -74,7 +73,6 @@
private final Serializer serializer;
private final boolean readOnly;
private final boolean purgeOnUninstall;
- private final Consumer<MapEvent<K, V>> eventPublisher;
private final MetricsService metricsService;
private final MetricsComponent metricsComponent;
@@ -130,22 +128,20 @@
Database database,
Serializer serializer,
boolean readOnly,
- boolean purgeOnUninstall,
- Consumer<MapEvent<K, V>> eventPublisher) {
+ boolean purgeOnUninstall) {
this.name = checkNotNull(name, "map name cannot be null");
this.applicationId = applicationId;
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.readOnly = readOnly;
this.purgeOnUninstall = purgeOnUninstall;
- this.eventPublisher = eventPublisher;
this.database.registerConsumer(update -> {
SharedExecutors.getSingleThreadExecutor().execute(() -> {
if (update.target() == MAP) {
Result<UpdateResult<String, byte[]>> result = update.output();
if (result.success() && result.value().mapName().equals(name)) {
MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
- notifyLocalListeners(mapEvent);
+ notifyListeners(mapEvent);
}
}
});
@@ -423,12 +419,7 @@
oldVersionMatch,
value == null ? null : serializer.encode(value))
.thenApply(this::unwrapResult)
- .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
- .whenComplete((r, e) -> {
- if (r != null && e == null && !database.hasChangeNotificationSupport()) {
- notifyListeners(r.toMapEvent());
- }
- });
+ .thenApply(r -> r.<K, V>map(this::dK, serializer::decode));
}
private <T> T unwrapResult(Result<T> result) {
@@ -458,26 +449,16 @@
}
protected void notifyListeners(MapEvent<K, V> event) {
- try {
- if (event != null) {
- notifyLocalListeners(event);
- notifyRemoteListeners(event);
+ if (event == null) {
+ return;
+ }
+ listeners.forEach(listener -> {
+ try {
+ listener.event(event);
+ } catch (Exception e) {
+ log.warn("Failure notifying listener about {}", event, e);
}
- } catch (Exception e) {
- log.warn("Failure notifying listeners about {}", event, e);
- }
- }
-
- protected void notifyLocalListeners(MapEvent<K, V> event) {
- if (event != null) {
- listeners.forEach(listener -> listener.event(event));
- }
- }
-
- protected void notifyRemoteListeners(MapEvent<K, V> event) {
- if (eventPublisher != null) {
- eventPublisher.accept(event);
- }
+ });
}
private OperationTimer startTimer(String op) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
index 2b07140..8926c51 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -22,7 +22,6 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
-import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
/**
@@ -110,10 +109,7 @@
partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
serializer,
readOnly,
- purgeOnUninstall,
- event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
- DatabaseManager.mapUpdatesSubject(name),
- serializer::encode));
+ purgeOnUninstall);
return manager.registerMap(asyncMap);
}
}
\ No newline at end of file