blob: 7ff6b89cce64778b8400622e3c357c16450a9b37 [file] [log] [blame]
Madan Jampanib5d72d52015-04-03 16:53:50 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.consistent.impl;
17
Flavio Castro41b1f3a2015-07-31 13:51:32 -070018import org.onosproject.store.service.AsyncAtomicCounter;
19import org.slf4j.Logger;
20
Madan Jampanib5d72d52015-04-03 16:53:50 -070021import java.util.concurrent.CompletableFuture;
Madan Jampanif4d58f32015-06-05 17:38:22 -070022import java.util.concurrent.ScheduledExecutorService;
23import java.util.concurrent.TimeUnit;
24import java.util.function.BiFunction;
Madan Jampani04aeb452015-05-02 16:12:24 -070025
Flavio Castro41b1f3a2015-07-31 13:51:32 -070026import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampanif4d58f32015-06-05 17:38:22 -070027import static org.slf4j.LoggerFactory.getLogger;
Madan Jampanib5d72d52015-04-03 16:53:50 -070028
29/**
30 * Default implementation for a distributed AsyncAtomicCounter backed by
31 * partitioned Raft DB.
32 * <p>
33 * The initial value will be zero.
34 */
35public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
36
37 private final String name;
38 private final Database database;
Madan Jampanif4d58f32015-06-05 17:38:22 -070039 private final boolean retryOnFailure;
40 private final ScheduledExecutorService retryExecutor;
41 // TODO: configure delay via builder
42 private static final int DELAY_BETWEEN_RETRY_SEC = 1;
43 private final Logger log = getLogger(getClass());
Flavio Castro41b1f3a2015-07-31 13:51:32 -070044 private final MeteringAgent monitor;
45
46 private static final String PRIMITIVE_NAME = "atomicCounter";
47 private static final String INCREMENT_AND_GET = "incrementAndGet";
48 private static final String GET_AND_INCREMENT = "getAndIncrement";
49 private static final String GET_AND_ADD = "getAndAdd";
50 private static final String ADD_AND_GET = "addAndGet";
51 private static final String GET = "get";
Madan Jampanib5d72d52015-04-03 16:53:50 -070052
Madan Jampanif4d58f32015-06-05 17:38:22 -070053 public DefaultAsyncAtomicCounter(String name,
Flavio Castro41b1f3a2015-07-31 13:51:32 -070054 Database database,
55 boolean retryOnException,
56 boolean meteringEnabled,
57 ScheduledExecutorService retryExecutor) {
Madan Jampanib5d72d52015-04-03 16:53:50 -070058 this.name = checkNotNull(name);
59 this.database = checkNotNull(database);
Madan Jampanif4d58f32015-06-05 17:38:22 -070060 this.retryOnFailure = retryOnException;
61 this.retryExecutor = retryExecutor;
Flavio Castro41b1f3a2015-07-31 13:51:32 -070062 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
Madan Jampanib5d72d52015-04-03 16:53:50 -070063 }
64
65 @Override
66 public CompletableFuture<Long> incrementAndGet() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070067 final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
68 return addAndGet(1L)
69 .whenComplete((r, e) -> timer.stop());
Madan Jampanib5d72d52015-04-03 16:53:50 -070070 }
71
72 @Override
73 public CompletableFuture<Long> get() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070074 final MeteringAgent.Context timer = monitor.startTimer(GET);
75 return database.counterGet(name)
76 .whenComplete((r, e) -> timer.stop());
Madan Jampani04aeb452015-05-02 16:12:24 -070077 }
78
79 @Override
80 public CompletableFuture<Long> getAndIncrement() {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070081 final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT);
82 return getAndAdd(1L)
83 .whenComplete((r, e) -> timer.stop());
Madan Jampani04aeb452015-05-02 16:12:24 -070084 }
85
86 @Override
87 public CompletableFuture<Long> getAndAdd(long delta) {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070088 final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
Madan Jampanif4d58f32015-06-05 17:38:22 -070089 CompletableFuture<Long> result = database.counterGetAndAdd(name, delta);
90 if (!retryOnFailure) {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070091 return result
92 .whenComplete((r, e) -> timer.stop());
Madan Jampanif4d58f32015-06-05 17:38:22 -070093 }
94
95 CompletableFuture<Long> future = new CompletableFuture<>();
96 return result.whenComplete((r, e) -> {
Flavio Castro41b1f3a2015-07-31 13:51:32 -070097 timer.stop();
98 // TODO : Account for retries
Madan Jampanif4d58f32015-06-05 17:38:22 -070099 if (e != null) {
100 log.warn("getAndAdd failed due to {}. Will retry", e.getMessage());
101 retryExecutor.schedule(new RetryTask(database::counterGetAndAdd, delta, future),
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700102 DELAY_BETWEEN_RETRY_SEC,
103 TimeUnit.SECONDS);
Madan Jampanif4d58f32015-06-05 17:38:22 -0700104 } else {
105 future.complete(r);
106 }
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700107 }).thenCompose(v -> future);
Madan Jampani04aeb452015-05-02 16:12:24 -0700108 }
109
110 @Override
111 public CompletableFuture<Long> addAndGet(long delta) {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700112 final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
Madan Jampanif4d58f32015-06-05 17:38:22 -0700113 CompletableFuture<Long> result = database.counterAddAndGet(name, delta);
114 if (!retryOnFailure) {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700115 return result
116 .whenComplete((r, e) -> timer.stop());
Madan Jampanif4d58f32015-06-05 17:38:22 -0700117 }
118
119 CompletableFuture<Long> future = new CompletableFuture<>();
120 return result.whenComplete((r, e) -> {
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700121 timer.stop();
122 // TODO : Account for retries
Madan Jampanif4d58f32015-06-05 17:38:22 -0700123 if (e != null) {
124 log.warn("addAndGet failed due to {}. Will retry", e.getMessage());
125 retryExecutor.schedule(new RetryTask(database::counterAddAndGet, delta, future),
Flavio Castro41b1f3a2015-07-31 13:51:32 -0700126 DELAY_BETWEEN_RETRY_SEC,
127 TimeUnit.SECONDS);
Madan Jampanif4d58f32015-06-05 17:38:22 -0700128 } else {
129 future.complete(r);
130 }
131 }).thenCompose(v -> future);
Madan Jampanib5d72d52015-04-03 16:53:50 -0700132 }
Madan Jampanif4d58f32015-06-05 17:38:22 -0700133
134 private class RetryTask implements Runnable {
135
136 private final BiFunction<String, Long, CompletableFuture<Long>> function;
137 private final Long delta;
138 private final CompletableFuture<Long> result;
139
140 public RetryTask(BiFunction<String, Long, CompletableFuture<Long>> function,
141 Long delta,
142 CompletableFuture<Long> result) {
143 this.function = function;
144 this.delta = delta;
145 this.result = result;
146 }
147
148 @Override
149 public void run() {
150 function.apply(name, delta).whenComplete((r, e) -> {
151 if (e == null) {
152 result.complete(r);
153 } else {
154 log.warn("{} retry failed due to {}. Will try again...", function, e.getMessage());
155 // TODO: Exponential backoff
156 // TODO: limit retries
157 retryExecutor.schedule(this, DELAY_BETWEEN_RETRY_SEC, TimeUnit.SECONDS);
158 }
159 });
160 }
161 }
162}