blob: 69f43e261b6c61aa3c4583484b20fadd91b1cab9 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampani0c0cdc62016-02-22 16:54:06 -080022import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080023import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070024import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080025import java.util.Set;
26import java.util.concurrent.CompletableFuture;
27import java.util.function.Consumer;
28
29import org.onosproject.cluster.Leadership;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.event.Change;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080032import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
33import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
39import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
40import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080041import org.onosproject.store.service.AsyncLeaderElector;
42
43import com.google.common.collect.Sets;
44
45/**
46 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
47 */
Madan Jampani966a5852016-04-01 10:39:57 -070048@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
Madan Jampani65f24bb2016-03-15 15:16:18 -070049public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
Madan Jampanifc981772016-02-16 09:46:42 -080050 implements AsyncLeaderElector {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080051 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Madan Jampani68b1f5a2016-04-05 19:17:58 -070052 Sets.newIdentityHashSet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053
Madan Jampanic94b4852016-02-23 18:18:37 -080054 public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055
Madan Jampani65f24bb2016-03-15 15:16:18 -070056 public AtomixLeaderElector(CopycatClient client, Properties properties) {
57 super(client, properties);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058 }
59
60 @Override
61 public String name() {
62 return null;
63 }
64
65 @Override
66 public CompletableFuture<AtomixLeaderElector> open() {
67 return super.open().thenApply(result -> {
Madan Jampani0c0cdc62016-02-22 16:54:06 -080068 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080069 return result;
70 });
71 }
72
Madan Jampani0c0cdc62016-02-22 16:54:06 -080073 private void handleEvent(List<Change<Leadership>> changes) {
74 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080075 }
76
77 @Override
78 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080079 return submit(new Run(topic, nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080080 }
81
82 @Override
83 public CompletableFuture<Void> withdraw(String topic) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080084 return submit(new Withdraw(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 }
86
87 @Override
88 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080089 return submit(new Anoint(topic, nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -080090 }
91
92 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -080093 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -080094 return submit(new Promote(topic, nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -080095 }
96
97 @Override
98 public CompletableFuture<Void> evict(NodeId nodeId) {
99 return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
100 }
101
102 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800103 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800104 return submit(new GetLeadership(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800105 }
106
107 @Override
108 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800109 return submit(new GetAllLeaderships());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 }
111
112 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800113 return submit(new GetElectedTopics(nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800116 @Override
117 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
118 if (leadershipChangeListeners.isEmpty()) {
119 return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
120 } else {
121 leadershipChangeListeners.add(consumer);
122 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800123 }
124 }
125
126 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800127 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700128 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800129 return submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800130 }
131 return CompletableFuture.completedFuture(null);
132 }
133}