blob: 3e4509154aacb5c9d87636a6dff6a23c62be7e0e [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampanid7ff34d2016-05-26 18:20:55 -070022import java.util.Collection;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080023import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070025import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.function.Consumer;
Jon Hall7a8bfc62016-05-26 17:59:04 -070029import java.util.function.Function;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080030
31import org.onosproject.cluster.Leadership;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.event.Change;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080034import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
39import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
40import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
41import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
42import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080043import org.onosproject.store.service.AsyncLeaderElector;
Madan Jampani8e18c0e2016-06-20 14:08:16 -070044
Madan Jampanid7ff34d2016-05-26 18:20:55 -070045import com.google.common.collect.ImmutableSet;
Madan Jampani630e7ac2016-05-31 11:34:05 -070046import com.google.common.cache.CacheBuilder;
47import com.google.common.cache.CacheLoader;
48import com.google.common.cache.LoadingCache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080049import com.google.common.collect.Sets;
50
51/**
52 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
53 */
Madan Jampani966a5852016-04-01 10:39:57 -070054@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
Madan Jampani65f24bb2016-03-15 15:16:18 -070055public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
Madan Jampanifc981772016-02-16 09:46:42 -080056 implements AsyncLeaderElector {
Madan Jampanid7ff34d2016-05-26 18:20:55 -070057 private final Set<Consumer<Status>> statusChangeListeners =
58 Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Madan Jampanid7ff34d2016-05-26 18:20:55 -070060 Sets.newCopyOnWriteArraySet();
Madan Jampani630e7ac2016-05-31 11:34:05 -070061 private final Consumer<Change<Leadership>> cacheUpdater;
62 private final Consumer<Status> statusListener;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080063
Madan Jampanic94b4852016-02-23 18:18:37 -080064 public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
Madan Jampani630e7ac2016-05-31 11:34:05 -070065 private final LoadingCache<String, CompletableFuture<Leadership>> cache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080066
Jon Hall7a8bfc62016-05-26 17:59:04 -070067 Function<CopycatClient.State, Status> mapper = state -> {
68 switch (state) {
69 case CONNECTED:
70 return Status.ACTIVE;
71 case SUSPENDED:
72 return Status.SUSPENDED;
73 case CLOSED:
74 return Status.INACTIVE;
75 default:
76 throw new IllegalStateException("Unknown state " + state);
77 }
78 };
79
Madan Jampani65f24bb2016-03-15 15:16:18 -070080 public AtomixLeaderElector(CopycatClient client, Properties properties) {
81 super(client, properties);
Madan Jampani630e7ac2016-05-31 11:34:05 -070082 cache = CacheBuilder.newBuilder()
83 .maximumSize(1000)
Madan Jampani825a8b12016-06-06 19:42:01 -070084 .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
Madan Jampani630e7ac2016-05-31 11:34:05 -070085
86 cacheUpdater = change -> {
87 Leadership leadership = change.newValue();
88 cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
89 };
90 statusListener = status -> {
91 if (status == Status.SUSPENDED || status == Status.INACTIVE) {
92 cache.invalidateAll();
93 }
94 };
95 addStatusChangeListener(statusListener);
Jon Hall7a8bfc62016-05-26 17:59:04 -070096 client.onStateChange(this::handleStateChange);
Madan Jampani630e7ac2016-05-31 11:34:05 -070097 }
98
99 @Override
100 public CompletableFuture<Void> destroy() {
101 removeStatusChangeListener(statusListener);
102 return removeChangeListener(cacheUpdater);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 }
104
105 @Override
106 public String name() {
107 return null;
108 }
109
110 @Override
111 public CompletableFuture<AtomixLeaderElector> open() {
112 return super.open().thenApply(result -> {
Madan Jampanifb786382016-06-13 10:25:35 -0700113 client.onStateChange(state -> {
114 if (state == CopycatClient.State.CONNECTED && isListening()) {
115 client.submit(new Listen());
116 }
117 });
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800118 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 return result;
120 });
121 }
122
Madan Jampani630e7ac2016-05-31 11:34:05 -0700123 public CompletableFuture<AtomixLeaderElector> setupCache() {
124 return addChangeListener(cacheUpdater).thenApply(v -> this);
125 }
126
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800127 private void handleEvent(List<Change<Leadership>> changes) {
128 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800129 }
130
131 @Override
132 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700133 return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 }
135
136 @Override
137 public CompletableFuture<Void> withdraw(String topic) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700138 return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 }
140
141 @Override
142 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700143 return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800144 }
145
146 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800147 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700148 return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800149 }
150
151 @Override
152 public CompletableFuture<Void> evict(NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700153 return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800154 }
155
156 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani77012442016-06-02 07:47:42 -0700158 return cache.getUnchecked(topic)
159 .whenComplete((r, e) -> {
160 if (e != null) {
161 cache.invalidate(topic);
162 }
163 });
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800164 }
165
166 @Override
167 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Madan Jampani825a8b12016-06-06 19:42:01 -0700168 return client.submit(new GetAllLeaderships());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800169 }
170
171 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700172 return client.submit(new GetElectedTopics(nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800173 }
174
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800175 @Override
176 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
177 if (leadershipChangeListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700178 return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800179 } else {
180 leadershipChangeListeners.add(consumer);
181 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800182 }
183 }
184
185 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800186 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700187 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Madan Jampani825a8b12016-06-06 19:42:01 -0700188 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800189 }
190 return CompletableFuture.completedFuture(null);
191 }
Madan Jampanid7ff34d2016-05-26 18:20:55 -0700192
193 @Override
194 public void addStatusChangeListener(Consumer<Status> listener) {
195 statusChangeListeners.add(listener);
196 }
197
198 @Override
199 public void removeStatusChangeListener(Consumer<Status> listener) {
200 statusChangeListeners.remove(listener);
201 }
202
203 @Override
204 public Collection<Consumer<Status>> statusChangeListeners() {
205 return ImmutableSet.copyOf(statusChangeListeners);
206 }
Madan Jampanifb786382016-06-13 10:25:35 -0700207
208 private boolean isListening() {
209 return !leadershipChangeListeners.isEmpty();
210 }
Jon Hall7a8bfc62016-05-26 17:59:04 -0700211
212 private void handleStateChange(CopycatClient.State state) {
213 statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
214 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800215}