Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 5c1fc33..7e7ec06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,7 +17,6 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -54,13 +53,14 @@
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
@@ -351,22 +351,34 @@
*/
private void fetchBits(Application app) {
ControllerNode localNode = clusterService.getLocalNode();
- ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
- app.id().name().getBytes(Charsets.UTF_8));
- //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
CountDownLatch latch = new CountDownLatch(1);
// FIXME: send message with name & version to make sure we don't get served old bits
log.info("Downloading bits for application {}", app.id().name());
for (ControllerNode node : clusterService.getNodes()) {
- try {
- ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
- future.addListener(new InternalBitListener(app, node, future, latch), executor);
- } catch (IOException e) {
- log.debug("Unable to request bits for application {} from node {}",
- app.id().name(), node.id());
+ if (latch.getCount() == 0) {
+ break;
}
+ if (node.equals(localNode)) {
+ continue;
+ }
+ clusterCommunicator.sendAndReceive(app.id().name(),
+ APP_BITS_REQUEST,
+ s -> s.getBytes(Charsets.UTF_8),
+ Function.identity(),
+ node.id())
+ .whenCompleteAsync((bits, error) -> {
+ if (error == null && latch.getCount() > 0) {
+ saveApplication(new ByteArrayInputStream(bits));
+ log.info("Downloaded bits for application {} from node {}",
+ app.id().name(), node.id());
+ latch.countDown();
+ } else if (error != null) {
+ log.warn("Unable to fetch bits for application {} from node {}",
+ app.id().name(), node.id(), error);
+ }
+ }, executor);
}
try {
@@ -392,41 +404,6 @@
}
}
}
-
- /**
- * Processes completed fetch requests.
- */
- private class InternalBitListener implements Runnable {
- private final Application app;
- private final ControllerNode node;
- private final ListenableFuture<byte[]> future;
- private final CountDownLatch latch;
-
- public InternalBitListener(Application app, ControllerNode node,
- ListenableFuture<byte[]> future, CountDownLatch latch) {
- this.app = app;
- this.node = node;
- this.future = future;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- if (latch.getCount() > 0 && !future.isCancelled()) {
- try {
- byte[] bits = future.get(1, MILLISECONDS);
- saveApplication(new ByteArrayInputStream(bits));
- log.info("Downloaded bits for application {} from node {}",
- app.id().name(), node.id());
- latch.countDown();
- } catch (Exception e) {
- log.warn("Unable to fetch bits for application {} from node {}",
- app.id().name(), node.id());
- }
- }
- }
- }
-
/**
* Prunes applications which are not in the map, but are on disk.
*/
@@ -449,6 +426,4 @@
appDesc.origin(), appDesc.permissions(),
appDesc.featuresRepo(), appDesc.features());
}
-
}
-
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 2f6a149..10bf6a4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -419,10 +419,9 @@
// Dispatch to all instances
clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(leadershipEvent)));
+ leadershipEvent,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
} else {
//
// Test if time to expire a stale leader
@@ -491,11 +490,11 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(leadershipEvent)));
+ leadershipEvent,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
// Sleep forever until interrupted
@@ -519,11 +518,12 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(leadershipEvent)));
+ leadershipEvent,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
+
if (leaderLock.isLockedByCurrentThread()) {
leaderLock.unlock();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42a79b3..204e0fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -37,8 +36,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -122,7 +130,84 @@
return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
}
- private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
+ @Override
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
+ SettableFuture<byte[]> response = SettableFuture.create();
+ sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
+ if (e == null) {
+ response.set(r);
+ } else {
+ response.setException(e);
+ }
+ });
+ return response;
+ }
+
+ @Override
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> boolean unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ byte[] payload = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message)).getBytes();
+ return unicastUnchecked(subject, payload, toNodeId);
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ byte[] payload = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message)).getBytes();
+ nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ }
+
+ private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -131,37 +216,15 @@
return true;
} catch (IOException e) {
log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
- throw e;
- }
- }
-
- private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- try {
- return unicast(subject, payload, toNodeId);
- } catch (IOException e) {
return false;
}
}
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- try {
- return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
-
- } catch (IOException e) {
- log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
- throw e;
- }
- }
-
- @Override
- @Deprecated
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
- messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+ return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
}
@Override
@@ -202,6 +265,60 @@
}
}
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageResponder<>(decoder, encoder, handler),
+ executor);
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ private class InternalMessageResponder<M, R> implements MessageHandler {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, R> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, R> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+ @Override
+ public void handle(Message message) throws IOException {
+ R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+ message.respond(encoder.apply(response));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements MessageHandler {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+ @Override
+ public void handle(Message message) throws IOException {
+ consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+ }
+ }
+
public static final class InternalClusterMessage extends ClusterMessage {
private final Message rawMessage;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 8bebff7..cf3700b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -343,11 +343,9 @@
private void notifyPeers(LeadershipEvent event) {
eventDispatcher.post(event);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
@@ -366,11 +364,9 @@
if (updatedLeader) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
eventDispatcher.post(event);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
}
@@ -469,11 +465,9 @@
leaderBoard.forEach((path, leadership) -> {
if (leadership.leader().equals(localNodeId)) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
});
} catch (Exception e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 131000b..b47376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -304,11 +304,9 @@
DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
providerId, deviceId, deviceDescription);
- ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
- SERIALIZER.encode(deviceInjectedEvent));
// TODO check unicast return value
- clusterCommunicator.unicast(clusterMessage, deviceNode);
+ clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
/* error log:
log.warn("Failed to process injected device id: {} desc: {} " +
"(cluster messaging failed: {})",
@@ -555,11 +553,9 @@
}
PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
- ClusterMessage clusterMessage = new ClusterMessage(
- localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
//TODO check unicast return value
- clusterCommunicator.unicast(clusterMessage, deviceNode);
+ clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
/* error log:
log.warn("Failed to process injected ports of device id: {} " +
"(cluster messaging failed: {})",
@@ -867,13 +863,8 @@
log.debug("{} has control of {}, forwarding remove request",
master, deviceId);
- ClusterMessage message = new ClusterMessage(
- myId,
- DEVICE_REMOVE_REQ,
- SERIALIZER.encode(deviceId));
-
// TODO check unicast return value
- clusterCommunicator.unicast(message, master);
+ clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
/* error log:
log.error("Failed to forward {} remove request to {}", deviceId, master, e);
*/
@@ -1057,19 +1048,11 @@
}
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, recipient);
+ clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
}
private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void notifyPeers(InternalDeviceEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 33e4251..c46131e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -510,11 +510,7 @@
}
private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- serializer.encode(event));
- return clusterCommunicator.unicast(message, peer);
+ return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
// Note: we had this flipped before...
// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 781e24b..f5807e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
@@ -35,6 +36,7 @@
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
+import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
@@ -93,7 +95,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -360,22 +361,16 @@
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), rule.deviceId());
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.GET_FLOW_ENTRY,
- SERIALIZER.encode(rule));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
- }
- return null;
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
+ FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
-
-
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -393,22 +388,16 @@
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_DEVICE_FLOW_ENTRIES,
- SERIALIZER.encode(deviceId));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
- }
- return Collections.emptyList();
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
+ FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptyList());
}
-
-
@Override
public void storeFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(
@@ -453,14 +442,10 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- local,
- APPLY_BATCH_FLOWS,
- SERIALIZER.encode(operation));
-
-
- if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
- log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
+ if (!clusterCommunicator.unicast(operation,
+ APPLY_BATCH_FLOWS, SERIALIZER::encode,
+ replicaInfo.master().get())) {
+ log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.target())
@@ -612,18 +597,15 @@
log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOVE_FLOW_ENTRY,
- SERIALIZER.encode(rule));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- // TODO: Retry against latest master or throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ return Futures.get(clusterCommunicator.sendAndReceive(
+ rule,
+ REMOVE_FLOW_ENTRY,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ RuntimeException.class);
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -649,12 +631,8 @@
if (nodeId == null) {
notifyDelegate(event);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOTE_APPLY_COMPLETED,
- SERIALIZER.encode(event));
// TODO check unicast return value
- clusterCommunicator.unicast(message, nodeId);
+ clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
//error log: log.warn("Failed to respond to peer for batch operation result");
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 708faf6..b609a3c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -20,6 +20,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -45,7 +46,6 @@
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -199,18 +199,12 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(clusterService
- .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
-
- try {
- ListenableFuture<byte[]> responseFuture = clusterCommunicator
- .sendAndReceive(message, replicaInfo.master().get());
- // here should add another decode process
- return Futures.transform(responseFuture,
- new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
- }
+ return clusterCommunicator.sendAndReceive(
+ batchOperation,
+ APPLY_EXTEND_FLOWS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get());
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 51b4111..ae1669b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -382,17 +382,13 @@
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GroupStoreMessageSubjects.
- REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build().serialize(groupOp));
- if (!clusterCommunicator.unicast(message,
- mastershipService.
- getMasterFor(
- groupDesc.deviceId()))) {
+
+ if (!clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(groupDesc.deviceId()))) {
log.warn("Failed to send request to master: {} to {}",
- message,
+ groupOp,
mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
}
@@ -472,16 +468,13 @@
type,
newBuckets,
newAppCookie);
- ClusterMessage message =
- new ClusterMessage(clusterService.getLocalNode().id(),
- GroupStoreMessageSubjects.
- REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build().serialize(groupOp));
- if (!clusterCommunicator.unicast(message,
- mastershipService.
- getMasterFor(deviceId))) {
+
+ if (!clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
- message,
+ groupOp,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
@@ -584,16 +577,13 @@
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupDeleteRequestMsg(deviceId,
appCookie);
- ClusterMessage message =
- new ClusterMessage(clusterService.getLocalNode().id(),
- GroupStoreMessageSubjects.
- REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build().serialize(groupOp));
- if (!clusterCommunicator.unicast(message,
- mastershipService.
- getMasterFor(deviceId))) {
+
+ if (!clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
- message,
+ groupOp,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index 74236c3..1e6f8aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -477,21 +477,13 @@
}
private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void unicastMessage(NodeId peer,
MessageSubject subject,
Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, peer);
+ clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
}
private void notifyDelegateIfNotNull(HostEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 48c430b..5e09d05 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -334,17 +334,12 @@
LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
- ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
- GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
// TODO check unicast return value
- clusterCommunicator.unicast(linkInjectedMessage, dstNode);
- /* error log:
- log.warn("Failed to process link update between src: {} and dst: {} " +
- "(cluster messaging failed: {})",
- linkDescription.src(), linkDescription.dst(), e);
- */
-
+ clusterCommunicator.unicast(linkInjectedEvent,
+ GossipLinkStoreMessageSubjects.LINK_INJECTED,
+ SERIALIZER::encode,
+ dstNode);
}
return linkEvent;
@@ -653,19 +648,11 @@
}
private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, recipient);
+ clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
}
private void notifyPeers(InternalLinkEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 25870b7..8a54502 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -181,20 +181,14 @@
} else {
return MastershipRole.NONE;
}
- } else {
- try {
- MastershipRole role = complete(clusterCommunicator.sendAndReceive(
- new ClusterMessage(
- localNodeId,
- ROLE_QUERY_SUBJECT,
- SERIALIZER.encode(deviceId)),
- nodeId));
- return role == null ? MastershipRole.NONE : role;
- } catch (IOException e) {
- log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
- return MastershipRole.NONE;
- }
}
+ MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+ deviceId,
+ ROLE_QUERY_SUBJECT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ nodeId));
+ return role == null ? MastershipRole.NONE : role;
}
@Override
@@ -276,17 +270,12 @@
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
- try {
- return complete(clusterCommunicator.sendAndReceive(
- new ClusterMessage(
- localNodeId,
- ROLE_RELINQUISH_SUBJECT,
- SERIALIZER.encode(deviceId)),
- nodeId));
- } catch (IOException e) {
- log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
- return null;
- }
+ return complete(clusterCommunicator.sendAndReceive(
+ deviceId,
+ ROLE_RELINQUISH_SUBJECT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ nodeId));
}
// Check if this node is can be managed by this node.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 7ee8712..8bf8ae6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -131,9 +131,7 @@
}
// TODO check unicast return value
- communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT,
- SERIALIZER.encode(packet)),
- master);
+ communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
// error log: log.warn("Failed to send packet-out to {}", master);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index f07c0e8..e314789 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.statistic.impl;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
@@ -47,12 +49,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.onlab.util.Tools.groupedThreads;
@@ -218,20 +217,15 @@
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getCurrentStatisticInternal(connectPoint);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_CURRENT,
- SERIALIZER.encode(connectPoint));
-
- try {
- Future<byte[]> response =
- clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
- return Collections.emptySet();
- }
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_CURRENT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
}
}
@@ -251,22 +245,16 @@
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getPreviousStatisticInternal(connectPoint);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_PREVIOUS,
- SERIALIZER.encode(connectPoint));
-
- try {
- Future<byte[]> response =
- clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
- return Collections.emptySet();
- }
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_PREVIOUS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
}
-
}
private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {