blob: f6384445ed0f80de8bc0b66cbdd2ab0447d857c8 [file] [log] [blame]
Madan Jampani9b19a822014-11-04 21:37:13 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.io.IOException;
6import java.util.concurrent.CompletableFuture;
7import java.util.concurrent.ExecutionException;
8import java.util.concurrent.ScheduledExecutorService;
9import java.util.concurrent.ScheduledThreadPoolExecutor;
10import java.util.concurrent.ThreadFactory;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
13
14import net.kuujo.copycat.protocol.PingRequest;
15import net.kuujo.copycat.protocol.PingResponse;
16import net.kuujo.copycat.protocol.PollRequest;
17import net.kuujo.copycat.protocol.PollResponse;
18import net.kuujo.copycat.protocol.SubmitRequest;
19import net.kuujo.copycat.protocol.SubmitResponse;
20import net.kuujo.copycat.protocol.SyncRequest;
21import net.kuujo.copycat.protocol.SyncResponse;
22import net.kuujo.copycat.spi.protocol.ProtocolClient;
23
24import org.onlab.onos.cluster.ControllerNode;
25import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27import org.onlab.onos.store.cluster.messaging.MessageSubject;
28import org.slf4j.Logger;
29
30import com.google.common.util.concurrent.ThreadFactoryBuilder;
31
32/**
33 * Licensed to the Apache Software Foundation (ASF) under one
34 * or more contributor license agreements. See the NOTICE file
35 * distributed with this work for additional information
36 * regarding copyright ownership. The ASF licenses this file
37 * to you under the Apache License, Version 2.0 (the
38 * "License"); you may not use this file except in compliance
39 * with the License. You may obtain a copy of the License at
40 *
41 * http://www.apache.org/licenses/LICENSE-2.0
42
43 * Unless required by applicable law or agreed to in writing,
44 * software distributed under the License is distributed on an
45 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
46 * KIND, either express or implied. See the License for the
47 * specific language governing permissions and limitations under
48 * the License.
49 */
50
51public class ClusterMessagingProtocolClient implements ProtocolClient {
52
53 private final Logger log = getLogger(getClass());
54
55 private static final ThreadFactory THREAD_FACTORY =
56 new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
57
58 public static final long RETRY_INTERVAL_MILLIS = 2000;
59
60 private final ClusterCommunicationService clusterCommunicator;
61 private final ControllerNode remoteNode;
62
63 // FIXME: Thread pool sizing.
64 private static final ScheduledExecutorService THREAD_POOL =
65 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
66
67 public ClusterMessagingProtocolClient(
68 ClusterCommunicationService clusterCommunicator,
69 ControllerNode remoteNode) {
70 this.clusterCommunicator = clusterCommunicator;
71 this.remoteNode = remoteNode;
72 }
73
74 @Override
75 public CompletableFuture<PingResponse> ping(PingRequest request) {
76 return requestReply(request);
77 }
78
79 @Override
80 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
81 return requestReply(request);
82 }
83
84 @Override
85 public CompletableFuture<PollResponse> poll(PollRequest request) {
86 return requestReply(request);
87 }
88
89 @Override
90 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
91 return requestReply(request);
92 }
93
94 @Override
95 public CompletableFuture<Void> connect() {
96 return CompletableFuture.completedFuture(null);
97 }
98
99 @Override
100 public CompletableFuture<Void> close() {
101 return CompletableFuture.completedFuture(null);
102 }
103
104 public <I> MessageSubject messageType(I input) {
105 Class<?> clazz = input.getClass();
106 if (clazz.equals(PollRequest.class)) {
107 return ClusterMessagingProtocol.COPYCAT_POLL;
108 } else if (clazz.equals(SyncRequest.class)) {
109 return ClusterMessagingProtocol.COPYCAT_SYNC;
110 } else if (clazz.equals(SubmitRequest.class)) {
111 return ClusterMessagingProtocol.COPYCAT_SUBMIT;
112 } else if (clazz.equals(PingRequest.class)) {
113 return ClusterMessagingProtocol.COPYCAT_PING;
114 } else {
115 throw new IllegalArgumentException("Unknown class " + clazz.getName());
116 }
117
118 }
119
120 private <I, O> CompletableFuture<O> requestReply(I request) {
121 CompletableFuture<O> future = new CompletableFuture<>();
122 THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
123 return future;
124 }
125
126 private class RPCTask<I, O> implements Runnable {
127
128 private final ClusterMessage message;
129 private final CompletableFuture<O> future;
130
131 public RPCTask(I request, CompletableFuture<O> future) {
132 this.message =
133 new ClusterMessage(
134 null,
135 messageType(request),
136 ClusterMessagingProtocol.SERIALIZER.encode(request));
137 this.future = future;
138 }
139
140 @Override
141 public void run() {
142 try {
143 byte[] response = clusterCommunicator
144 .sendAndReceive(message, remoteNode.id())
145 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
146 future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
147
148 } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
149 if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
150 message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
151 log.warn("Request to {} failed. Will retry "
152 + "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
153 THREAD_POOL.schedule(
154 this,
155 RETRY_INTERVAL_MILLIS,
156 TimeUnit.MILLISECONDS);
157 } else {
158 future.completeExceptionally(e);
159 }
160 } catch (Exception e) {
161 future.completeExceptionally(e);
162 }
163 }
164 }
165}