Removed usage of deprecated ClusterCommunicationService APIs
Change-Id: Id306dadad48d1bad7b3fbde3a40ba3e0fdac4cbc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 027378a..24ce215 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -35,8 +35,6 @@
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
@@ -104,9 +102,10 @@
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/packet", "message-handlers"));
- communicationService.addSubscriber(PACKET_OUT_SUBJECT,
- new InternalClusterMessageHandler(),
- messageHandlingExecutor);
+ communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
+ SERIALIZER::decode,
+ packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
+ messageHandlingExecutor);
tracker = new PacketRequestTracker();
@@ -134,9 +133,12 @@
return;
}
- // TODO check unicast return value
- communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
- // error log: log.warn("Failed to send packet-out to {}", master);
+ communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+ .whenComplete((r, error) -> {
+ if (error != null) {
+ log.warn("Failed to send packet-out to {}", master, error);
+ }
+ });
}
@Override
@@ -154,21 +156,6 @@
return tracker.requests();
}
- /**
- * Handles incoming cluster messages.
- */
- private class InternalClusterMessageHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
- log.warn("Received message with wrong subject: {}", message);
- }
-
- OutboundPacket packet = SERIALIZER.decode(message.payload());
- notifyDelegate(new PacketEvent(Type.EMIT, packet));
- }
- }
-
private class PacketRequestTracker {
private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;