blob: d541bb1b6c15837087bcedb4957a983b594ea3e2 [file] [log] [blame]
Madan Jampani630e7ac2016-05-31 11:34:05 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070018import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
19import static org.onlab.util.Tools.groupedThreads;
Yuta HIGUCHIdec159a2016-12-19 14:52:41 -080020import static org.onlab.util.Tools.maxPriority;
Madan Jampani630e7ac2016-05-31 11:34:05 -070021import static org.slf4j.LoggerFactory.getLogger;
22
23import java.net.ConnectException;
24import java.nio.channels.ClosedChannelException;
25import java.util.concurrent.CompletableFuture;
Madan Jampani630e7ac2016-05-31 11:34:05 -070026import java.util.concurrent.ScheduledExecutorService;
27import java.util.concurrent.TimeUnit;
28import java.util.concurrent.TimeoutException;
29import java.util.function.Predicate;
30
Madan Jampani825a8b12016-06-06 19:42:01 -070031import org.onlab.util.Tools;
32import org.onosproject.store.service.StorageException;
Madan Jampani630e7ac2016-05-31 11:34:05 -070033import org.slf4j.Logger;
34
35import com.google.common.base.Throwables;
36
37import io.atomix.catalyst.transport.TransportException;
38import io.atomix.copycat.Query;
39import io.atomix.copycat.client.CopycatClient;
40import io.atomix.copycat.error.QueryException;
41import io.atomix.copycat.error.UnknownSessionException;
42import io.atomix.copycat.session.ClosedSessionException;
43
44/**
Madan Jampani825a8b12016-06-06 19:42:01 -070045 * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
Madan Jampani630e7ac2016-05-31 11:34:05 -070046 */
Madan Jampani825a8b12016-06-06 19:42:01 -070047public class OnosCopycatClient extends DelegatingCopycatClient {
Madan Jampani630e7ac2016-05-31 11:34:05 -070048
49 private final int maxRetries;
50 private final long delayBetweenRetriesMillis;
51 private final ScheduledExecutorService executor;
52 private final Logger log = getLogger(getClass());
53
54 private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
55 || e instanceof TimeoutException
56 || e instanceof TransportException
57 || e instanceof ClosedChannelException
58 || e instanceof QueryException
59 || e instanceof UnknownSessionException
Madan Jampani21a71492016-06-16 16:53:04 -070060 || e instanceof ClosedSessionException
61 || e instanceof StorageException.Unavailable;
Madan Jampani630e7ac2016-05-31 11:34:05 -070062
Madan Jampani825a8b12016-06-06 19:42:01 -070063 OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
Madan Jampani630e7ac2016-05-31 11:34:05 -070064 super(client);
65 this.maxRetries = maxRetries;
66 this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
Yuta HIGUCHIdec159a2016-12-19 14:52:41 -080067 this.executor = newSingleThreadScheduledExecutor(maxPriority(groupedThreads("OnosCopycat", "client", log)));
Madan Jampani630e7ac2016-05-31 11:34:05 -070068 }
69
70 @Override
71 public CompletableFuture<Void> close() {
72 executor.shutdown();
73 return super.close();
74 }
75
76 @Override
77 public <T> CompletableFuture<T> submit(Query<T> query) {
Madan Jampanib2cfa6a2016-06-21 10:47:56 -070078 if (state() == State.CLOSED) {
Madan Jampani825a8b12016-06-06 19:42:01 -070079 return Tools.exceptionalFuture(new StorageException.Unavailable());
80 }
Madan Jampani630e7ac2016-05-31 11:34:05 -070081 CompletableFuture<T> future = new CompletableFuture<>();
Yuta HIGUCHIdec159a2016-12-19 14:52:41 -080082 executor.execute(() -> submit(query, 1, future));
Madan Jampani630e7ac2016-05-31 11:34:05 -070083 return future;
84 }
85
86 private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
87 client.submit(query).whenComplete((r, e) -> {
88 if (e != null) {
89 if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
90 log.debug("Retry attempt ({} of {}). Failure due to {}",
91 attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
92 executor.schedule(() ->
93 submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
94 } else {
95 future.completeExceptionally(e);
96 }
97 } else {
98 future.complete(r);
99 }
100 });
101 }
102}