blob: 389f026e5d9d15f63baef0a61834533395fd2b5e [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import io.atomix.catalyst.util.Listener;
19import io.atomix.copycat.client.CopycatClient;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.Resource;
21import io.atomix.resource.ResourceTypeInfo;
22
Madan Jampani0c0cdc62016-02-22 16:54:06 -080023import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import java.util.Map;
25import java.util.Set;
26import java.util.concurrent.CompletableFuture;
27import java.util.function.Consumer;
28
29import org.onosproject.cluster.Leadership;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.event.Change;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080032import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
33import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
39import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
40import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080041import org.onosproject.store.service.AsyncLeaderElector;
42
43import com.google.common.collect.Sets;
44
45/**
46 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
47 */
Madan Jampani86cb2432016-02-17 11:07:56 -080048@ResourceTypeInfo(id = -152,
49 stateMachine = AtomixLeaderElectorState.class,
50 typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
51public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
Madan Jampanifc981772016-02-16 09:46:42 -080052 implements AsyncLeaderElector {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
54 Sets.newConcurrentHashSet();
55
Madan Jampanic94b4852016-02-23 18:18:37 -080056 public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057 private Listener<Change<Leadership>> listener;
58
59 public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
60 super(client, options);
61 }
62
63 @Override
64 public String name() {
65 return null;
66 }
67
68 @Override
69 public CompletableFuture<AtomixLeaderElector> open() {
70 return super.open().thenApply(result -> {
Madan Jampani0c0cdc62016-02-22 16:54:06 -080071 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072 return result;
73 });
74 }
75
Madan Jampani0c0cdc62016-02-22 16:54:06 -080076 private void handleEvent(List<Change<Leadership>> changes) {
77 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080078 }
79
80 @Override
81 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080082 return submit(new Run(topic, nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080083 }
84
85 @Override
86 public CompletableFuture<Void> withdraw(String topic) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080087 return submit(new Withdraw(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088 }
89
90 @Override
91 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080092 return submit(new Anoint(topic, nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 }
94
95 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -080096 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080097 return submit(new Promote(topic, nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -080098 }
99
100 @Override
101 public CompletableFuture<Void> evict(NodeId nodeId) {
102 return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
103 }
104
105 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800106 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800107 return submit(new GetLeadership(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 }
109
110 @Override
111 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800112 return submit(new GetAllLeaderships());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 }
114
115 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800116 return submit(new GetElectedTopics(nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800117 }
118
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800119 @Override
120 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
121 if (leadershipChangeListeners.isEmpty()) {
122 return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
123 } else {
124 leadershipChangeListeners.add(consumer);
125 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800126 }
127 }
128
129 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800130 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
131 if (leadershipChangeListeners.remove(listener) && leadershipChangeListeners.isEmpty()) {
132 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800133 }
134 return CompletableFuture.completedFuture(null);
135 }
136}