Reduce use of deprecated ClusterCommunicationService.addSubscriber()
Change-Id: I2e78e63340473b0334a1537f8049753a9f400849
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index ad3236e..2a8c14a 100644
--- a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -54,8 +54,6 @@
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -71,6 +69,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
@@ -186,7 +185,7 @@
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/perf", "command-handler"));
- communicationService.addSubscriber(CONTROL, new InternalControl(),
+ communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
messageHandlingExecutor);
listener = new Listener();
@@ -572,10 +571,9 @@
}
}
- private class InternalControl implements ClusterMessageHandler {
+ private class InternalControl implements Consumer<String> {
@Override
- public void handle(ClusterMessage message) {
- String cmd = new String(message.payload());
+ public void accept(String cmd) {
log.info("Received command {}", cmd);
if (cmd.equals(START)) {
startTestRun();
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
index a7183de..c64a091 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/resource/impl/DistributedLabelResourceStore.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015 Open Networking Laboratory
+ * Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -55,8 +55,6 @@
import org.onosproject.net.device.DeviceService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
@@ -126,62 +124,42 @@
"message-handlers"));
clusterCommunicator
.addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- LabelResourcePool operation = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- operation);
- boolean b = internalCreate(operation);
- message.respond(SERIALIZER.encode(b));
- }
- }, messageHandlingExecutor);
+ SERIALIZER::<LabelResourcePool>decode,
+ operation -> {
+ log.trace("received get flow entry request for {}", operation);
+ return internalCreate(operation);
+ },
+ SERIALIZER::<Boolean>encode,
+ messageHandlingExecutor);
clusterCommunicator
.addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- deviceId);
- boolean b = internalDestroy(deviceId);
- message.respond(SERIALIZER.encode(b));
- }
- }, messageHandlingExecutor);
+ SERIALIZER::<DeviceId>decode,
+ deviceId -> {
+ log.trace("received get flow entry request for {}", deviceId);
+ return internalDestroy(deviceId);
+ },
+ SERIALIZER::<Boolean>encode,
+ messageHandlingExecutor);
clusterCommunicator
.addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
- new ClusterMessageHandler() {
+ SERIALIZER::<LabelResourceRequest>decode,
+ request -> {
+ log.trace("received get flow entry request for {}", request);
+ return internalApply(request);
- @Override
- public void handle(ClusterMessage message) {
- LabelResourceRequest request = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- request);
- final Collection<LabelResource> resource = internalApply(request);
- message.respond(SERIALIZER
- .encode(resource));
- }
- }, messageHandlingExecutor);
+ },
+ SERIALIZER::<Collection<LabelResource>>encode,
+ messageHandlingExecutor);
clusterCommunicator
.addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
- new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- LabelResourceRequest request = SERIALIZER
- .decode(message.payload());
- log.trace("received get flow entry request for {}",
- request);
- final boolean isSuccess = internalRelease(request);
- message.respond(SERIALIZER
- .encode(isSuccess));
- }
- }, messageHandlingExecutor);
+ SERIALIZER::<LabelResourceRequest>decode,
+ request -> {
+ log.trace("received get flow entry request for {}",
+ request);
+ return internalRelease(request);
+ },
+ SERIALIZER::<Boolean>encode,
+ messageHandlingExecutor);
log.info("Started");
}