Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean
Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index 23b219b..823f658 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -66,27 +66,27 @@
}
@Override
- public EventuallyConsistentMapBuilder withName(String name) {
+ public EventuallyConsistentMapBuilder<K, V> withName(String name) {
this.name = checkNotNull(name);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withSerializer(
+ public EventuallyConsistentMapBuilder<K, V> withSerializer(
KryoNamespace.Builder serializerBuilder) {
this.serializerBuilder = checkNotNull(serializerBuilder);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withClockService(
+ public EventuallyConsistentMapBuilder<K, V> withClockService(
ClockService<K, V> clockService) {
this.clockService = checkNotNull(clockService);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+ public EventuallyConsistentMapBuilder<K, V> withEventExecutor(ExecutorService executor) {
this.eventExecutor = checkNotNull(executor);
return this;
}
@@ -99,13 +99,13 @@
}
@Override
- public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+ public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(ScheduledExecutorService executor) {
this.backgroundExecutor = checkNotNull(executor);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+ public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
return this;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f03215d..18ecefe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -509,12 +509,6 @@
);
}
- private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
- return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
- // Note: we had this flipped before...
-// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
- }
-
private boolean underHighLoad() {
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
@@ -556,10 +550,14 @@
}
AntiEntropyAdvertisement<K> ad = createAdvertisement();
+ NodeId destination = peer;
+ clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send anti-entropy advertisement to {}", destination);
+ }
+ });
- if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- }
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -595,9 +593,14 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
- log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
- }
+
+ clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send reactive "
+ + "anti-entropy advertisement to {}", sender);
+ }
+ });
break;
}
}
@@ -801,11 +804,15 @@
)
);
communicationExecutor.submit(() -> {
- try {
- unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
- } catch (Exception e) {
- log.warn("broadcast error", e);
- }
+ clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+ updateMessageSubject,
+ serializer::encode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send to {}", peer);
+ }
+ });
});
}
}