Removed usage of deprecated ClusterCommunicationService APIs

Change-Id: Id306dadad48d1bad7b3fbde3a40ba3e0fdac4cbc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 027378a..24ce215 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -35,8 +35,6 @@
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
@@ -104,9 +102,10 @@
                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
                 groupedThreads("onos/store/packet", "message-handlers"));
 
-        communicationService.addSubscriber(PACKET_OUT_SUBJECT,
-                                           new InternalClusterMessageHandler(),
-                                           messageHandlingExecutor);
+        communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
+                SERIALIZER::decode,
+                packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
+                messageHandlingExecutor);
 
         tracker = new PacketRequestTracker();
 
@@ -134,9 +133,12 @@
             return;
         }
 
-        // TODO check unicast return value
-        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
-        // error log: log.warn("Failed to send packet-out to {}", master);
+        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+                            .whenComplete((r, error) -> {
+                                if (error != null) {
+                                    log.warn("Failed to send packet-out to {}", master, error);
+                                }
+                            });
     }
 
     @Override
@@ -154,21 +156,6 @@
         return tracker.requests();
     }
 
-    /**
-     * Handles incoming cluster messages.
-     */
-    private class InternalClusterMessageHandler implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
-                log.warn("Received message with wrong subject: {}", message);
-            }
-
-            OutboundPacket packet = SERIALIZER.decode(message.payload());
-            notifyDelegate(new PacketEvent(Type.EMIT, packet));
-        }
-    }
-
     private class PacketRequestTracker {
 
         private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;