blob: e40665f27c15ea849ae94b79c5dddd68d7cb7cb6 [file] [log] [blame]
Madan Jampani619453b2015-07-22 23:47:09 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampani27b69c62015-05-15 15:49:02 -070017
18import java.net.URI;
19import java.nio.ByteBuffer;
20import java.util.concurrent.CompletableFuture;
21
22import org.onlab.util.Tools;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.ControllerNode;
25import org.onosproject.cluster.NodeId;
26import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
27import org.onosproject.store.cluster.messaging.MessageSubject;
28
29import net.kuujo.copycat.protocol.AbstractProtocol;
30import net.kuujo.copycat.protocol.ProtocolClient;
31import net.kuujo.copycat.protocol.ProtocolHandler;
32import net.kuujo.copycat.protocol.ProtocolServer;
33import net.kuujo.copycat.util.Configurable;
34
35/**
36 * Protocol for Copycat communication that employs
37 * {@code ClusterCommunicationService}.
38 */
39public class CopycatCommunicationProtocol extends AbstractProtocol {
40
41 private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
42 new MessageSubject("onos-copycat-message");
43
44 protected ClusterService clusterService;
45 protected ClusterCommunicationService clusterCommunicator;
46
47 public CopycatCommunicationProtocol(ClusterService clusterService,
48 ClusterCommunicationService clusterCommunicator) {
49 this.clusterService = clusterService;
50 this.clusterCommunicator = clusterCommunicator;
51 }
52
53 @Override
54 public Configurable copy() {
55 return this;
56 }
57
58 @Override
59 public ProtocolClient createClient(URI uri) {
60 NodeId nodeId = uriToNodeId(uri);
61 if (nodeId == null) {
62 throw new IllegalStateException("Unknown peer " + uri);
63 }
64 return new Client(nodeId);
65 }
66
67 @Override
68 public ProtocolServer createServer(URI uri) {
69 return new Server();
70 }
71
72 private class Server implements ProtocolServer {
73
74 @Override
75 public void handler(ProtocolHandler handler) {
76 if (handler == null) {
77 clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
78 } else {
79 clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
80 ByteBuffer::wrap,
81 handler,
82 Tools::byteBuffertoArray);
83 // FIXME: Tools::byteBuffertoArray involves a array copy.
84 }
85 }
86
87 @Override
88 public CompletableFuture<Void> listen() {
89 return CompletableFuture.completedFuture(null);
90 }
91
92 @Override
93 public CompletableFuture<Void> close() {
94 clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
95 return CompletableFuture.completedFuture(null);
96 }
97 }
98
99 private class Client implements ProtocolClient {
100 private final NodeId peer;
101
102 public Client(NodeId peer) {
103 this.peer = peer;
104 }
105
106 @Override
107 public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
108 return clusterCommunicator.sendAndReceive(request,
109 COPYCAT_MESSAGE_SUBJECT,
110 Tools::byteBuffertoArray,
111 ByteBuffer::wrap,
112 peer);
113 }
114
115 @Override
116 public CompletableFuture<Void> connect() {
117 return CompletableFuture.completedFuture(null);
118 }
119
120 @Override
121 public CompletableFuture<Void> close() {
122 return CompletableFuture.completedFuture(null);
123 }
124 }
125
126 private NodeId uriToNodeId(URI uri) {
127 return clusterService.getNodes()
128 .stream()
129 .filter(node -> uri.getHost().equals(node.ip().toString()))
130 .map(ControllerNode::id)
131 .findAny()
132 .orElse(null);
133 }
134}