blob: 628ff71f8e3e8c63282ba7f21b4971d6999c0cc1 [file] [log] [blame]
Madan Jampani27b69c62015-05-15 15:49:02 -07001package org.onosproject.store.consistent.impl;
2
3import java.net.URI;
4import java.nio.ByteBuffer;
5import java.util.concurrent.CompletableFuture;
6
7import org.onlab.util.Tools;
8import org.onosproject.cluster.ClusterService;
9import org.onosproject.cluster.ControllerNode;
10import org.onosproject.cluster.NodeId;
11import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
12import org.onosproject.store.cluster.messaging.MessageSubject;
13
14import net.kuujo.copycat.protocol.AbstractProtocol;
15import net.kuujo.copycat.protocol.ProtocolClient;
16import net.kuujo.copycat.protocol.ProtocolHandler;
17import net.kuujo.copycat.protocol.ProtocolServer;
18import net.kuujo.copycat.util.Configurable;
19
20/**
21 * Protocol for Copycat communication that employs
22 * {@code ClusterCommunicationService}.
23 */
24public class CopycatCommunicationProtocol extends AbstractProtocol {
25
26 private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
27 new MessageSubject("onos-copycat-message");
28
29 protected ClusterService clusterService;
30 protected ClusterCommunicationService clusterCommunicator;
31
32 public CopycatCommunicationProtocol(ClusterService clusterService,
33 ClusterCommunicationService clusterCommunicator) {
34 this.clusterService = clusterService;
35 this.clusterCommunicator = clusterCommunicator;
36 }
37
38 @Override
39 public Configurable copy() {
40 return this;
41 }
42
43 @Override
44 public ProtocolClient createClient(URI uri) {
45 NodeId nodeId = uriToNodeId(uri);
46 if (nodeId == null) {
47 throw new IllegalStateException("Unknown peer " + uri);
48 }
49 return new Client(nodeId);
50 }
51
52 @Override
53 public ProtocolServer createServer(URI uri) {
54 return new Server();
55 }
56
57 private class Server implements ProtocolServer {
58
59 @Override
60 public void handler(ProtocolHandler handler) {
61 if (handler == null) {
62 clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
63 } else {
64 clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
65 ByteBuffer::wrap,
66 handler,
67 Tools::byteBuffertoArray);
68 // FIXME: Tools::byteBuffertoArray involves a array copy.
69 }
70 }
71
72 @Override
73 public CompletableFuture<Void> listen() {
74 return CompletableFuture.completedFuture(null);
75 }
76
77 @Override
78 public CompletableFuture<Void> close() {
79 clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
80 return CompletableFuture.completedFuture(null);
81 }
82 }
83
84 private class Client implements ProtocolClient {
85 private final NodeId peer;
86
87 public Client(NodeId peer) {
88 this.peer = peer;
89 }
90
91 @Override
92 public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
93 return clusterCommunicator.sendAndReceive(request,
94 COPYCAT_MESSAGE_SUBJECT,
95 Tools::byteBuffertoArray,
96 ByteBuffer::wrap,
97 peer);
98 }
99
100 @Override
101 public CompletableFuture<Void> connect() {
102 return CompletableFuture.completedFuture(null);
103 }
104
105 @Override
106 public CompletableFuture<Void> close() {
107 return CompletableFuture.completedFuture(null);
108 }
109 }
110
111 private NodeId uriToNodeId(URI uri) {
112 return clusterService.getNodes()
113 .stream()
114 .filter(node -> uri.getHost().equals(node.ip().toString()))
115 .map(ControllerNode::id)
116 .findAny()
117 .orElse(null);
118 }
119}