blob: 56511a100522489c1184622ae4abeb025ad52e13 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampanid7ff34d2016-05-26 18:20:55 -070022import java.util.Collection;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080023import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070025import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.function.Consumer;
29
30import org.onosproject.cluster.Leadership;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.event.Change;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080033import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
39import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
40import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
41import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080042import org.onosproject.store.service.AsyncLeaderElector;
Madan Jampanid7ff34d2016-05-26 18:20:55 -070043import com.google.common.collect.ImmutableSet;
Madan Jampani630e7ac2016-05-31 11:34:05 -070044import com.google.common.cache.CacheBuilder;
45import com.google.common.cache.CacheLoader;
46import com.google.common.cache.LoadingCache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080047import com.google.common.collect.Sets;
48
49/**
50 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
51 */
Madan Jampani966a5852016-04-01 10:39:57 -070052@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
Madan Jampani65f24bb2016-03-15 15:16:18 -070053public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
Madan Jampanifc981772016-02-16 09:46:42 -080054 implements AsyncLeaderElector {
Madan Jampanid7ff34d2016-05-26 18:20:55 -070055 private final Set<Consumer<Status>> statusChangeListeners =
56 Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Madan Jampanid7ff34d2016-05-26 18:20:55 -070058 Sets.newCopyOnWriteArraySet();
Madan Jampani630e7ac2016-05-31 11:34:05 -070059 private final Consumer<Change<Leadership>> cacheUpdater;
60 private final Consumer<Status> statusListener;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080061
Madan Jampanic94b4852016-02-23 18:18:37 -080062 public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
Madan Jampani630e7ac2016-05-31 11:34:05 -070063 private final LoadingCache<String, CompletableFuture<Leadership>> cache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064
Madan Jampani65f24bb2016-03-15 15:16:18 -070065 public AtomixLeaderElector(CopycatClient client, Properties properties) {
66 super(client, properties);
Madan Jampani630e7ac2016-05-31 11:34:05 -070067 cache = CacheBuilder.newBuilder()
68 .maximumSize(1000)
Madan Jampani825a8b12016-06-06 19:42:01 -070069 .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
Madan Jampani630e7ac2016-05-31 11:34:05 -070070
71 cacheUpdater = change -> {
72 Leadership leadership = change.newValue();
73 cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
74 };
75 statusListener = status -> {
76 if (status == Status.SUSPENDED || status == Status.INACTIVE) {
77 cache.invalidateAll();
78 }
79 };
80 addStatusChangeListener(statusListener);
81 }
82
83 @Override
84 public CompletableFuture<Void> destroy() {
85 removeStatusChangeListener(statusListener);
86 return removeChangeListener(cacheUpdater);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080087 }
88
89 @Override
90 public String name() {
91 return null;
92 }
93
94 @Override
95 public CompletableFuture<AtomixLeaderElector> open() {
96 return super.open().thenApply(result -> {
Madan Jampanifb786382016-06-13 10:25:35 -070097 client.onStateChange(state -> {
98 if (state == CopycatClient.State.CONNECTED && isListening()) {
99 client.submit(new Listen());
100 }
101 });
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800102 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 return result;
104 });
105 }
106
Madan Jampani630e7ac2016-05-31 11:34:05 -0700107 public CompletableFuture<AtomixLeaderElector> setupCache() {
108 return addChangeListener(cacheUpdater).thenApply(v -> this);
109 }
110
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800111 private void handleEvent(List<Change<Leadership>> changes) {
112 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 }
114
115 @Override
116 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700117 return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800118 }
119
120 @Override
121 public CompletableFuture<Void> withdraw(String topic) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700122 return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800123 }
124
125 @Override
126 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700127 return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800128 }
129
130 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800131 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700132 return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800133 }
134
135 @Override
136 public CompletableFuture<Void> evict(NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700137 return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800138 }
139
140 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800141 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani77012442016-06-02 07:47:42 -0700142 return cache.getUnchecked(topic)
143 .whenComplete((r, e) -> {
144 if (e != null) {
145 cache.invalidate(topic);
146 }
147 });
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800148 }
149
150 @Override
151 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700152 return client.submit(new GetAllLeaderships());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 }
154
155 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700156 return client.submit(new GetElectedTopics(nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 }
158
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800159 @Override
160 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
161 if (leadershipChangeListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700162 return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800163 } else {
164 leadershipChangeListeners.add(consumer);
165 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 }
167 }
168
169 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800170 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700171 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700172 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800173 }
174 return CompletableFuture.completedFuture(null);
175 }
Madan Jampanid7ff34d2016-05-26 18:20:55 -0700176
177 @Override
178 public void addStatusChangeListener(Consumer<Status> listener) {
179 statusChangeListeners.add(listener);
180 }
181
182 @Override
183 public void removeStatusChangeListener(Consumer<Status> listener) {
184 statusChangeListeners.remove(listener);
185 }
186
187 @Override
188 public Collection<Consumer<Status>> statusChangeListeners() {
189 return ImmutableSet.copyOf(statusChangeListeners);
190 }
Madan Jampanifb786382016-06-13 10:25:35 -0700191
192 private boolean isListening() {
193 return !leadershipChangeListeners.isEmpty();
194 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800195}