Removed usage of deprecated ClusterCommunicationService APIs

Change-Id: Id306dadad48d1bad7b3fbde3a40ba3e0fdac4cbc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 3261f33..0c902bf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,6 +17,7 @@
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -39,8 +40,6 @@
 import org.onosproject.core.DefaultApplication;
 import org.onosproject.core.Permission;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.EventuallyConsistentMap;
@@ -48,10 +47,12 @@
 import org.onosproject.store.service.EventuallyConsistentMapListener;
 import org.onosproject.store.service.LogicalClockService;
 import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -126,7 +127,17 @@
         messageHandlingExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("onos/store/app", "message-handler"));
 
-        clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
+        clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
+                bytes -> new String(bytes, Charsets.UTF_8),
+                name -> {
+                    try {
+                        return toByteArray(getApplicationInputStream(name));
+                    } catch (IOException e) {
+                        throw new StorageException(e);
+                    }
+                },
+                Function.identity(),
+                messageHandlingExecutor);
 
         // FIXME: Consider consolidating into a single map.
 
@@ -394,21 +405,6 @@
     }
 
     /**
-     * Responder to requests for application bits.
-     */
-    private class InternalBitServer implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            String name = new String(message.payload(), Charsets.UTF_8);
-            try {
-                message.respond(toByteArray(getApplicationInputStream(name)));
-            } catch (Exception e) {
-                log.debug("Unable to read bits for application {}", name);
-            }
-        }
-    }
-
-    /**
      * Prunes applications which are not in the map, but are on disk.
      */
     private void pruneUninstalledApps() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index cd42b17..2c682a1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -60,8 +60,6 @@
 import org.onosproject.net.group.StoredGroupEntry;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.service.MultiValuedTimestamp;
 import org.onosproject.store.serializers.DeviceIdSerializer;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -198,10 +196,11 @@
                 newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
                                    groupedThreads("onos/store/group",
                                                   "message-handlers"));
-        clusterCommunicator.
-            addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
-                          new ClusterGroupMsgHandler(),
-                          messageHandlingExecutor);
+
+        clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                kryoBuilder.build()::deserialize,
+                this::process,
+                messageHandlingExecutor);
 
         log.debug("Creating EC map groupstorekeymap");
         EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
@@ -970,45 +969,27 @@
             }
         }
     }
-    /**
-     * Message handler to receive messages from group subsystems of
-     * other cluster members.
-     */
-    private final class ClusterGroupMsgHandler
-                    implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            if (message.subject().equals(
-                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
-                GroupStoreMessage groupOp = kryoBuilder.
-                        build().deserialize(message.payload());
-                log.debug("received remote group operation {} request for device {}",
-                          groupOp.type(),
-                          groupOp.deviceId());
-                if (mastershipService.
-                        getLocalRole(groupOp.deviceId()) !=
-                        MastershipRole.MASTER) {
-                    log.warn("ClusterGroupMsgHandler: This node is not "
-                            + "MASTER for device {}", groupOp.deviceId());
-                    return;
-                }
-                if (groupOp.type() == GroupStoreMessage.Type.ADD) {
-                    storeGroupDescriptionInternal(groupOp.groupDesc());
-                } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
-                    updateGroupDescriptionInternal(groupOp.deviceId(),
-                                                   groupOp.appCookie(),
-                                                   groupOp.updateType(),
-                                                   groupOp.updateBuckets(),
-                                                   groupOp.newAppCookie());
-                } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
-                    deleteGroupDescriptionInternal(groupOp.deviceId(),
-                                                   groupOp.appCookie());
-                }
-            } else {
-                log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
-                         message.subject());
-            }
-        }
+
+    private void process(GroupStoreMessage groupOp) {
+        log.debug("Received remote group operation {} request for device {}",
+                groupOp.type(),
+                groupOp.deviceId());
+      if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
+          log.warn("This node is not MASTER for device {}", groupOp.deviceId());
+          return;
+      }
+      if (groupOp.type() == GroupStoreMessage.Type.ADD) {
+          storeGroupDescriptionInternal(groupOp.groupDesc());
+      } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
+          updateGroupDescriptionInternal(groupOp.deviceId(),
+                                         groupOp.appCookie(),
+                                         groupOp.updateType(),
+                                         groupOp.updateBuckets(),
+                                         groupOp.newAppCookie());
+      } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
+          deleteGroupDescriptionInternal(groupOp.deviceId(),
+                                         groupOp.appCookie());
+      }
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 027378a..24ce215 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -35,8 +35,6 @@
 import org.onosproject.net.packet.PacketStoreDelegate;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
@@ -104,9 +102,10 @@
                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
                 groupedThreads("onos/store/packet", "message-handlers"));
 
-        communicationService.addSubscriber(PACKET_OUT_SUBJECT,
-                                           new InternalClusterMessageHandler(),
-                                           messageHandlingExecutor);
+        communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
+                SERIALIZER::decode,
+                packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
+                messageHandlingExecutor);
 
         tracker = new PacketRequestTracker();
 
@@ -134,9 +133,12 @@
             return;
         }
 
-        // TODO check unicast return value
-        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
-        // error log: log.warn("Failed to send packet-out to {}", master);
+        communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+                            .whenComplete((r, error) -> {
+                                if (error != null) {
+                                    log.warn("Failed to send packet-out to {}", master, error);
+                                }
+                            });
     }
 
     @Override
@@ -154,21 +156,6 @@
         return tracker.requests();
     }
 
-    /**
-     * Handles incoming cluster messages.
-     */
-    private class InternalClusterMessageHandler implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
-                log.warn("Received message with wrong subject: {}", message);
-            }
-
-            OutboundPacket packet = SERIALIZER.decode(message.payload());
-            notifyDelegate(new PacketEvent(Type.EMIT, packet));
-        }
-    }
-
     private class PacketRequestTracker {
 
         private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;