blob: f4a42521449f0e8241d2793f9f419cfe5a13815e [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampanid7ff34d2016-05-26 18:20:55 -070022import java.util.Collection;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080023import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070025import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.function.Consumer;
29
30import org.onosproject.cluster.Leadership;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.event.Change;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080033import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
39import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
40import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
41import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080042import org.onosproject.store.service.AsyncLeaderElector;
43
Madan Jampanid7ff34d2016-05-26 18:20:55 -070044import com.google.common.collect.ImmutableSet;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080045import com.google.common.collect.Sets;
46
47/**
48 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
49 */
Madan Jampani966a5852016-04-01 10:39:57 -070050@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
Madan Jampani65f24bb2016-03-15 15:16:18 -070051public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
Madan Jampanifc981772016-02-16 09:46:42 -080052 implements AsyncLeaderElector {
Madan Jampanid7ff34d2016-05-26 18:20:55 -070053 private final Set<Consumer<Status>> statusChangeListeners =
54 Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Madan Jampanid7ff34d2016-05-26 18:20:55 -070056 Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057
Madan Jampanic94b4852016-02-23 18:18:37 -080058 public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080059
Madan Jampani65f24bb2016-03-15 15:16:18 -070060 public AtomixLeaderElector(CopycatClient client, Properties properties) {
61 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080062 }
63
64 @Override
65 public String name() {
66 return null;
67 }
68
69 @Override
70 public CompletableFuture<AtomixLeaderElector> open() {
71 return super.open().thenApply(result -> {
Madan Jampani0c0cdc62016-02-22 16:54:06 -080072 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073 return result;
74 });
75 }
76
Madan Jampani0c0cdc62016-02-22 16:54:06 -080077 private void handleEvent(List<Change<Leadership>> changes) {
78 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080079 }
80
81 @Override
82 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080083 return submit(new Run(topic, nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 }
85
86 @Override
87 public CompletableFuture<Void> withdraw(String topic) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080088 return submit(new Withdraw(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 }
90
91 @Override
92 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080093 return submit(new Anoint(topic, nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080094 }
95
96 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -080097 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080098 return submit(new Promote(topic, nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -080099 }
100
101 @Override
102 public CompletableFuture<Void> evict(NodeId nodeId) {
103 return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
104 }
105
106 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800108 return submit(new GetLeadership(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 }
110
111 @Override
112 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800113 return submit(new GetAllLeaderships());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800117 return submit(new GetElectedTopics(nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800118 }
119
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800120 @Override
121 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
122 if (leadershipChangeListeners.isEmpty()) {
123 return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
124 } else {
125 leadershipChangeListeners.add(consumer);
126 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800127 }
128 }
129
130 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800131 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700132 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800133 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800134 }
135 return CompletableFuture.completedFuture(null);
136 }
Madan Jampanid7ff34d2016-05-26 18:20:55 -0700137
138 @Override
139 public void addStatusChangeListener(Consumer<Status> listener) {
140 statusChangeListeners.add(listener);
141 }
142
143 @Override
144 public void removeStatusChangeListener(Consumer<Status> listener) {
145 statusChangeListeners.remove(listener);
146 }
147
148 @Override
149 public Collection<Consumer<Status>> statusChangeListeners() {
150 return ImmutableSet.copyOf(statusChangeListeners);
151 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152}