blob: 39ed6c3624bc799e70d627c7a7b1c7d3665e00e6 [file] [log] [blame]
Madan Jampani630e7ac2016-05-31 11:34:05 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.net.ConnectException;
21import java.nio.channels.ClosedChannelException;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.Executors;
24import java.util.concurrent.ScheduledExecutorService;
25import java.util.concurrent.TimeUnit;
26import java.util.concurrent.TimeoutException;
27import java.util.function.Predicate;
28
29import org.slf4j.Logger;
30
31import com.google.common.base.Throwables;
32
33import io.atomix.catalyst.transport.TransportException;
34import io.atomix.copycat.Query;
35import io.atomix.copycat.client.CopycatClient;
36import io.atomix.copycat.error.QueryException;
37import io.atomix.copycat.error.UnknownSessionException;
38import io.atomix.copycat.session.ClosedSessionException;
39
40/**
41 * {@code CopycatClient} that can retry when certain recoverable errors are encoutered.
42 */
43public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
44
45 private final int maxRetries;
46 private final long delayBetweenRetriesMillis;
47 private final ScheduledExecutorService executor;
48 private final Logger log = getLogger(getClass());
49
50 private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
51 || e instanceof TimeoutException
52 || e instanceof TransportException
53 || e instanceof ClosedChannelException
54 || e instanceof QueryException
55 || e instanceof UnknownSessionException
56 || e instanceof ClosedSessionException;
57
58 QueryRetryingCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
59 super(client);
60 this.maxRetries = maxRetries;
61 this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
62 this.executor = Executors.newSingleThreadScheduledExecutor();
63 }
64
65 @Override
66 public CompletableFuture<Void> close() {
67 executor.shutdown();
68 return super.close();
69 }
70
71 @Override
72 public <T> CompletableFuture<T> submit(Query<T> query) {
73 CompletableFuture<T> future = new CompletableFuture<>();
74 executor.submit(() -> submit(query, 1, future));
75 return future;
76 }
77
78 private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
79 client.submit(query).whenComplete((r, e) -> {
80 if (e != null) {
81 if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
82 log.debug("Retry attempt ({} of {}). Failure due to {}",
83 attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
84 executor.schedule(() ->
85 submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
86 } else {
87 future.completeExceptionally(e);
88 }
89 } else {
90 future.complete(r);
91 }
92 });
93 }
94}