blob: a170afb4505f2914c8328072cfb67bb04e2b3763 [file] [log] [blame]
Madan Jampani630e7ac2016-05-31 11:34:05 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.net.ConnectException;
21import java.nio.channels.ClosedChannelException;
22import java.util.concurrent.CompletableFuture;
23import java.util.concurrent.Executors;
24import java.util.concurrent.ScheduledExecutorService;
25import java.util.concurrent.TimeUnit;
26import java.util.concurrent.TimeoutException;
27import java.util.function.Predicate;
28
Madan Jampani825a8b12016-06-06 19:42:01 -070029import org.onlab.util.Tools;
30import org.onosproject.store.service.StorageException;
Madan Jampani630e7ac2016-05-31 11:34:05 -070031import org.slf4j.Logger;
32
33import com.google.common.base.Throwables;
34
35import io.atomix.catalyst.transport.TransportException;
36import io.atomix.copycat.Query;
37import io.atomix.copycat.client.CopycatClient;
38import io.atomix.copycat.error.QueryException;
39import io.atomix.copycat.error.UnknownSessionException;
40import io.atomix.copycat.session.ClosedSessionException;
41
42/**
Madan Jampani825a8b12016-06-06 19:42:01 -070043 * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
Madan Jampani630e7ac2016-05-31 11:34:05 -070044 */
Madan Jampani825a8b12016-06-06 19:42:01 -070045public class OnosCopycatClient extends DelegatingCopycatClient {
Madan Jampani630e7ac2016-05-31 11:34:05 -070046
47 private final int maxRetries;
48 private final long delayBetweenRetriesMillis;
49 private final ScheduledExecutorService executor;
50 private final Logger log = getLogger(getClass());
51
52 private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
53 || e instanceof TimeoutException
54 || e instanceof TransportException
55 || e instanceof ClosedChannelException
56 || e instanceof QueryException
57 || e instanceof UnknownSessionException
58 || e instanceof ClosedSessionException;
59
Madan Jampani825a8b12016-06-06 19:42:01 -070060 OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
Madan Jampani630e7ac2016-05-31 11:34:05 -070061 super(client);
62 this.maxRetries = maxRetries;
63 this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
64 this.executor = Executors.newSingleThreadScheduledExecutor();
65 }
66
67 @Override
68 public CompletableFuture<Void> close() {
69 executor.shutdown();
70 return super.close();
71 }
72
73 @Override
74 public <T> CompletableFuture<T> submit(Query<T> query) {
Madan Jampani825a8b12016-06-06 19:42:01 -070075 if (state() == State.SUSPENDED || state() == State.CLOSED) {
76 return Tools.exceptionalFuture(new StorageException.Unavailable());
77 }
Madan Jampani630e7ac2016-05-31 11:34:05 -070078 CompletableFuture<T> future = new CompletableFuture<>();
79 executor.submit(() -> submit(query, 1, future));
80 return future;
81 }
82
83 private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
84 client.submit(query).whenComplete((r, e) -> {
85 if (e != null) {
86 if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
87 log.debug("Retry attempt ({} of {}). Failure due to {}",
88 attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
89 executor.schedule(() ->
90 submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
91 } else {
92 future.completeExceptionally(e);
93 }
94 } else {
95 future.complete(r);
96 }
97 });
98 }
99}