Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean

Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f03215d..18ecefe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -509,12 +509,6 @@
         );
     }
 
-    private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
-        return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
-        // Note: we had this flipped before...
-//        communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
-    }
-
     private boolean underHighLoad() {
         return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
     }
@@ -556,10 +550,14 @@
                 }
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
+                NodeId destination = peer;
+                clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
+                                   .whenComplete((result, error) -> {
+                                       if (error != null) {
+                                           log.debug("Failed to send anti-entropy advertisement to {}", destination);
+                                       }
+                                   });
 
-                if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
-                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
-                }
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -595,9 +593,14 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
-                        log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
-                    }
+
+                    clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
+                                       .whenComplete((result, error) -> {
+                                           if (error != null) {
+                                               log.debug("Failed to send reactive "
+                                                       + "anti-entropy advertisement to {}", sender);
+                                           }
+                                       });
                     break;
                 }
             }
@@ -801,11 +804,15 @@
                   )
             );
             communicationExecutor.submit(() -> {
-                try {
-                    unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
-                } catch (Exception e) {
-                    log.warn("broadcast error", e);
-                }
+                clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+                                            updateMessageSubject,
+                                            serializer::encode,
+                                            peer)
+                                   .whenComplete((result, error) -> {
+                                       if (error != null) {
+                                           log.debug("Failed to send to {}", peer);
+                                       }
+                                   });
             });
         }
     }