Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean
Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f03215d..18ecefe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -509,12 +509,6 @@
);
}
- private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
- return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
- // Note: we had this flipped before...
-// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
- }
-
private boolean underHighLoad() {
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
@@ -556,10 +550,14 @@
}
AntiEntropyAdvertisement<K> ad = createAdvertisement();
+ NodeId destination = peer;
+ clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send anti-entropy advertisement to {}", destination);
+ }
+ });
- if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- }
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -595,9 +593,14 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
- log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
- }
+
+ clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send reactive "
+ + "anti-entropy advertisement to {}", sender);
+ }
+ });
break;
}
}
@@ -801,11 +804,15 @@
)
);
communicationExecutor.submit(() -> {
- try {
- unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
- } catch (Exception e) {
- log.warn("broadcast error", e);
- }
+ clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+ updateMessageSubject,
+ serializer::encode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send to {}", peer);
+ }
+ });
});
}
}