blob: 27d513a666552ae1019c1b5f45a51a19954086e4 [file] [log] [blame]
Madan Jampani630e7ac2016-05-31 11:34:05 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19import static org.onlab.util.Tools.groupedThreads;
Madan Jampani630e7ac2016-05-31 11:34:05 -070020import static org.slf4j.LoggerFactory.getLogger;
21
22import java.net.ConnectException;
23import java.nio.channels.ClosedChannelException;
24import java.util.concurrent.CompletableFuture;
Madan Jampani630e7ac2016-05-31 11:34:05 -070025import java.util.concurrent.ScheduledExecutorService;
26import java.util.concurrent.TimeUnit;
27import java.util.concurrent.TimeoutException;
28import java.util.function.Predicate;
29
Madan Jampani825a8b12016-06-06 19:42:01 -070030import org.onlab.util.Tools;
31import org.onosproject.store.service.StorageException;
Madan Jampani630e7ac2016-05-31 11:34:05 -070032import org.slf4j.Logger;
33
34import com.google.common.base.Throwables;
35
36import io.atomix.catalyst.transport.TransportException;
37import io.atomix.copycat.Query;
38import io.atomix.copycat.client.CopycatClient;
39import io.atomix.copycat.error.QueryException;
40import io.atomix.copycat.error.UnknownSessionException;
41import io.atomix.copycat.session.ClosedSessionException;
42
43/**
Madan Jampani825a8b12016-06-06 19:42:01 -070044 * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
Madan Jampani630e7ac2016-05-31 11:34:05 -070045 */
Madan Jampani825a8b12016-06-06 19:42:01 -070046public class OnosCopycatClient extends DelegatingCopycatClient {
Madan Jampani630e7ac2016-05-31 11:34:05 -070047
48 private final int maxRetries;
49 private final long delayBetweenRetriesMillis;
50 private final ScheduledExecutorService executor;
51 private final Logger log = getLogger(getClass());
52
53 private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
54 || e instanceof TimeoutException
55 || e instanceof TransportException
56 || e instanceof ClosedChannelException
57 || e instanceof QueryException
58 || e instanceof UnknownSessionException
Madan Jampani21a71492016-06-16 16:53:04 -070059 || e instanceof ClosedSessionException
60 || e instanceof StorageException.Unavailable;
Madan Jampani630e7ac2016-05-31 11:34:05 -070061
Madan Jampani825a8b12016-06-06 19:42:01 -070062 OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
Madan Jampani630e7ac2016-05-31 11:34:05 -070063 super(client);
64 this.maxRetries = maxRetries;
65 this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070066 this.executor = newSingleThreadScheduledExecutor(groupedThreads("OnosCopycat", "client", log));
Madan Jampani630e7ac2016-05-31 11:34:05 -070067 }
68
69 @Override
70 public CompletableFuture<Void> close() {
71 executor.shutdown();
72 return super.close();
73 }
74
75 @Override
76 public <T> CompletableFuture<T> submit(Query<T> query) {
Madan Jampanib2cfa6a2016-06-21 10:47:56 -070077 if (state() == State.CLOSED) {
Madan Jampani825a8b12016-06-06 19:42:01 -070078 return Tools.exceptionalFuture(new StorageException.Unavailable());
79 }
Madan Jampani630e7ac2016-05-31 11:34:05 -070080 CompletableFuture<T> future = new CompletableFuture<>();
81 executor.submit(() -> submit(query, 1, future));
82 return future;
83 }
84
85 private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
86 client.submit(query).whenComplete((r, e) -> {
87 if (e != null) {
88 if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
89 log.debug("Retry attempt ({} of {}). Failure due to {}",
90 attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
91 executor.schedule(() ->
92 submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
93 } else {
94 future.completeExceptionally(e);
95 }
96 } else {
97 future.complete(r);
98 }
99 });
100 }
101}