blob: 53c70941b9bfe0b660da34a8fc408b42eb278315 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani0c0cdc62016-02-22 16:54:06 -080018import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080019import java.util.Map;
20import java.util.Set;
21import java.util.concurrent.CompletableFuture;
22import java.util.function.Consumer;
23
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import com.google.common.collect.Sets;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070025import io.atomix.protocols.raft.proxy.RaftProxy;
26import org.onlab.util.KryoNamespace;
27import org.onosproject.cluster.Leadership;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.event.Change;
30import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Anoint;
31import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetElectedTopics;
32import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetLeadership;
33import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Promote;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Run;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Withdraw;
36import org.onosproject.store.serializers.KryoNamespaces;
37import org.onosproject.store.service.AsyncLeaderElector;
38import org.onosproject.store.service.Serializer;
39
40import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents.CHANGE;
41import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ADD_LISTENER;
42import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ANOINT;
43import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.EVICT;
44import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ALL_LEADERSHIPS;
45import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ELECTED_TOPICS;
46import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
47import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.PROMOTE;
48import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.REMOVE_LISTENER;
49import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
50import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.WITHDRAW;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080051
52/**
53 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
54 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070055public class AtomixLeaderElector extends AbstractRaftPrimitive implements AsyncLeaderElector {
56 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
57 .register(KryoNamespaces.API)
58 .register(AtomixLeaderElectorOperations.NAMESPACE)
59 .register(AtomixLeaderElectorEvents.NAMESPACE)
60 .build());
61
62 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080063
Jordan Halterman2bf177c2017-06-29 01:49:08 -070064 public AtomixLeaderElector(RaftProxy proxy) {
65 super(proxy);
Jordan Halterman2bf177c2017-06-29 01:49:08 -070066 proxy.addStateChangeListener(state -> {
67 if (state == RaftProxy.State.CONNECTED && isListening()) {
68 proxy.invoke(ADD_LISTENER);
69 }
70 });
71 proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
Madan Jampani630e7ac2016-05-31 11:34:05 -070072 }
73
Madan Jampani0c0cdc62016-02-22 16:54:06 -080074 private void handleEvent(List<Change<Leadership>> changes) {
75 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076 }
77
78 @Override
79 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Jordan Halterman46c5eaa2018-01-24 16:46:55 -080080 return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080081 }
82
83 @Override
84 public CompletableFuture<Void> withdraw(String topic) {
Jordan Halterman46c5eaa2018-01-24 16:46:55 -080085 return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080086 }
87
88 @Override
89 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Jordan Halterman46c5eaa2018-01-24 16:46:55 -080090 return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080091 }
92
93 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -080094 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -070095 return proxy.<Promote, Boolean>invoke(
Jordan Halterman46c5eaa2018-01-24 16:46:55 -080096 PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080097 }
98
99 @Override
100 public CompletableFuture<Void> evict(NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700101 return proxy.invoke(EVICT, SERIALIZER::encode, new AtomixLeaderElectorOperations.Evict(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800102 }
103
104 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800105 public CompletableFuture<Leadership> getLeadership(String topic) {
Jordan Halterman46c5eaa2018-01-24 16:46:55 -0800106 return proxy.invoke(GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 }
108
109 @Override
110 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700111 return proxy.invoke(GET_ALL_LEADERSHIPS, SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 }
113
114 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700115 return proxy.invoke(GET_ELECTED_TOPICS, SERIALIZER::encode, new GetElectedTopics(nodeId), SERIALIZER::decode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116 }
117
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800118 @Override
119 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
120 if (leadershipChangeListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700121 return proxy.invoke(ADD_LISTENER).thenRun(() -> leadershipChangeListeners.add(consumer));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800122 } else {
123 leadershipChangeListeners.add(consumer);
124 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 }
126 }
127
128 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800129 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700130 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700131 return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800132 }
133 return CompletableFuture.completedFuture(null);
134 }
Madan Jampanid7ff34d2016-05-26 18:20:55 -0700135
Madan Jampanifb786382016-06-13 10:25:35 -0700136 private boolean isListening() {
137 return !leadershipChangeListeners.isEmpty();
138 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700139}