blob: 825fa9877d7fd33fffbde8320d5f561cc2f82682 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani0c0cdc62016-02-22 16:54:06 -080018import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import java.util.Map;
20import java.util.Set;
21import java.util.concurrent.CompletableFuture;
22import java.util.function.Consumer;
23
Madan Jampani630e7ac2016-05-31 11:34:05 -070024import com.google.common.cache.CacheBuilder;
25import com.google.common.cache.CacheLoader;
26import com.google.common.cache.LoadingCache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080027import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070028import io.atomix.protocols.raft.proxy.RaftProxy;
29import org.onlab.util.KryoNamespace;
30import org.onosproject.cluster.Leadership;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.event.Change;
33import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Anoint;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetElectedTopics;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetLeadership;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Promote;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Run;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Withdraw;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.AsyncLeaderElector;
41import org.onosproject.store.service.Serializer;
42
43import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents.CHANGE;
44import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ADD_LISTENER;
45import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ANOINT;
46import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.EVICT;
47import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ALL_LEADERSHIPS;
48import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ELECTED_TOPICS;
49import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
50import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.PROMOTE;
51import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.REMOVE_LISTENER;
52import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
53import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.WITHDRAW;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054
55/**
56 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
57 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070058public class AtomixLeaderElector extends AbstractRaftPrimitive implements AsyncLeaderElector {
59 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
60 .register(KryoNamespaces.API)
61 .register(AtomixLeaderElectorOperations.NAMESPACE)
62 .register(AtomixLeaderElectorEvents.NAMESPACE)
63 .build());
64
65 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani630e7ac2016-05-31 11:34:05 -070066 private final Consumer<Change<Leadership>> cacheUpdater;
67 private final Consumer<Status> statusListener;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068
Madan Jampani630e7ac2016-05-31 11:34:05 -070069 private final LoadingCache<String, CompletableFuture<Leadership>> cache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080070
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071 public AtomixLeaderElector(RaftProxy proxy) {
72 super(proxy);
Madan Jampani630e7ac2016-05-31 11:34:05 -070073 cache = CacheBuilder.newBuilder()
74 .maximumSize(1000)
Jordan Halterman2bf177c2017-06-29 01:49:08 -070075 .build(CacheLoader.from(topic -> proxy.invoke(
76 GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode)));
Madan Jampani630e7ac2016-05-31 11:34:05 -070077
78 cacheUpdater = change -> {
79 Leadership leadership = change.newValue();
80 cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
81 };
82 statusListener = status -> {
83 if (status == Status.SUSPENDED || status == Status.INACTIVE) {
84 cache.invalidateAll();
85 }
86 };
87 addStatusChangeListener(statusListener);
Jordan Halterman2bf177c2017-06-29 01:49:08 -070088
89 proxy.addStateChangeListener(state -> {
90 if (state == RaftProxy.State.CONNECTED && isListening()) {
91 proxy.invoke(ADD_LISTENER);
92 }
93 });
94 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
Madan Jampani630e7ac2016-05-31 11:34:05 -070095 }
96
97 @Override
98 public CompletableFuture<Void> destroy() {
99 removeStatusChangeListener(statusListener);
100 return removeChangeListener(cacheUpdater);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800101 }
102
Madan Jampani630e7ac2016-05-31 11:34:05 -0700103 public CompletableFuture<AtomixLeaderElector> setupCache() {
104 return addChangeListener(cacheUpdater).thenApply(v -> this);
105 }
106
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800107 private void handleEvent(List<Change<Leadership>> changes) {
108 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 }
110
111 @Override
112 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700113 return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode)
114 .whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800115 }
116
117 @Override
118 public CompletableFuture<Void> withdraw(String topic) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700119 return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic))
120 .whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800121 }
122
123 @Override
124 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700125 return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode)
126 .whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800127 }
128
129 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800130 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700131 return proxy.<Promote, Boolean>invoke(
132 PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode)
133 .whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800134 }
135
136 @Override
137 public CompletableFuture<Void> evict(NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700138 return proxy.invoke(EVICT, SERIALIZER::encode, new AtomixLeaderElectorOperations.Evict(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800139 }
140
141 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800142 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani77012442016-06-02 07:47:42 -0700143 return cache.getUnchecked(topic)
144 .whenComplete((r, e) -> {
145 if (e != null) {
146 cache.invalidate(topic);
147 }
148 });
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800149 }
150
151 @Override
152 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700153 return proxy.invoke(GET_ALL_LEADERSHIPS, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800154 }
155
156 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700157 return proxy.invoke(GET_ELECTED_TOPICS, SERIALIZER::encode, new GetElectedTopics(nodeId), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800158 }
159
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800160 @Override
161 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
162 if (leadershipChangeListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700163 return proxy.invoke(ADD_LISTENER).thenRun(() -> leadershipChangeListeners.add(consumer));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800164 } else {
165 leadershipChangeListeners.add(consumer);
166 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800167 }
168 }
169
170 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800171 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700172 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700173 return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800174 }
175 return CompletableFuture.completedFuture(null);
176 }
Madan Jampanid7ff34d2016-05-26 18:20:55 -0700177
Madan Jampanifb786382016-06-13 10:25:35 -0700178 private boolean isListening() {
179 return !leadershipChangeListeners.isEmpty();
180 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700181}