Removed usage of deprecated ClusterCommunicationService APIs
Change-Id: Id306dadad48d1bad7b3fbde3a40ba3e0fdac4cbc
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 3261f33..0c902bf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,6 +17,7 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -39,8 +40,6 @@
import org.onosproject.core.DefaultApplication;
import org.onosproject.core.Permission;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -48,10 +47,12 @@
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -126,7 +127,17 @@
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler"));
- clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
+ clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
+ bytes -> new String(bytes, Charsets.UTF_8),
+ name -> {
+ try {
+ return toByteArray(getApplicationInputStream(name));
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ },
+ Function.identity(),
+ messageHandlingExecutor);
// FIXME: Consider consolidating into a single map.
@@ -394,21 +405,6 @@
}
/**
- * Responder to requests for application bits.
- */
- private class InternalBitServer implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- String name = new String(message.payload(), Charsets.UTF_8);
- try {
- message.respond(toByteArray(getApplicationInputStream(name)));
- } catch (Exception e) {
- log.debug("Unable to read bits for application {}", name);
- }
- }
- }
-
- /**
* Prunes applications which are not in the map, but are on disk.
*/
private void pruneUninstalledApps() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index cd42b17..2c682a1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -60,8 +60,6 @@
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.DeviceIdSerializer;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -198,10 +196,11 @@
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/group",
"message-handlers"));
- clusterCommunicator.
- addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- new ClusterGroupMsgHandler(),
- messageHandlingExecutor);
+
+ clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ kryoBuilder.build()::deserialize,
+ this::process,
+ messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
@@ -970,45 +969,27 @@
}
}
}
- /**
- * Message handler to receive messages from group subsystems of
- * other cluster members.
- */
- private final class ClusterGroupMsgHandler
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- if (message.subject().equals(
- GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
- GroupStoreMessage groupOp = kryoBuilder.
- build().deserialize(message.payload());
- log.debug("received remote group operation {} request for device {}",
- groupOp.type(),
- groupOp.deviceId());
- if (mastershipService.
- getLocalRole(groupOp.deviceId()) !=
- MastershipRole.MASTER) {
- log.warn("ClusterGroupMsgHandler: This node is not "
- + "MASTER for device {}", groupOp.deviceId());
- return;
- }
- if (groupOp.type() == GroupStoreMessage.Type.ADD) {
- storeGroupDescriptionInternal(groupOp.groupDesc());
- } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
- updateGroupDescriptionInternal(groupOp.deviceId(),
- groupOp.appCookie(),
- groupOp.updateType(),
- groupOp.updateBuckets(),
- groupOp.newAppCookie());
- } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
- deleteGroupDescriptionInternal(groupOp.deviceId(),
- groupOp.appCookie());
- }
- } else {
- log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
- message.subject());
- }
- }
+
+ private void process(GroupStoreMessage groupOp) {
+ log.debug("Received remote group operation {} request for device {}",
+ groupOp.type(),
+ groupOp.deviceId());
+ if (!mastershipService.isLocalMaster(groupOp.deviceId())) {
+ log.warn("This node is not MASTER for device {}", groupOp.deviceId());
+ return;
+ }
+ if (groupOp.type() == GroupStoreMessage.Type.ADD) {
+ storeGroupDescriptionInternal(groupOp.groupDesc());
+ } else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
+ updateGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie(),
+ groupOp.updateType(),
+ groupOp.updateBuckets(),
+ groupOp.newAppCookie());
+ } else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
+ deleteGroupDescriptionInternal(groupOp.deviceId(),
+ groupOp.appCookie());
+ }
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 027378a..24ce215 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -35,8 +35,6 @@
import org.onosproject.net.packet.PacketStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
@@ -104,9 +102,10 @@
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/packet", "message-handlers"));
- communicationService.addSubscriber(PACKET_OUT_SUBJECT,
- new InternalClusterMessageHandler(),
- messageHandlingExecutor);
+ communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
+ SERIALIZER::decode,
+ packet -> notifyDelegate(new PacketEvent(Type.EMIT, packet)),
+ messageHandlingExecutor);
tracker = new PacketRequestTracker();
@@ -134,9 +133,12 @@
return;
}
- // TODO check unicast return value
- communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
- // error log: log.warn("Failed to send packet-out to {}", master);
+ communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master)
+ .whenComplete((r, error) -> {
+ if (error != null) {
+ log.warn("Failed to send packet-out to {}", master, error);
+ }
+ });
}
@Override
@@ -154,21 +156,6 @@
return tracker.requests();
}
- /**
- * Handles incoming cluster messages.
- */
- private class InternalClusterMessageHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- if (!message.subject().equals(PACKET_OUT_SUBJECT)) {
- log.warn("Received message with wrong subject: {}", message);
- }
-
- OutboundPacket packet = SERIALIZER.decode(message.payload());
- notifyDelegate(new PacketEvent(Type.EMIT, packet));
- }
- }
-
private class PacketRequestTracker {
private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;