blob: be5d9ca8ed280eba2a77e59096fa23c039b306b9 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import io.atomix.copycat.client.CopycatClient;
Madan Jampani65f24bb2016-03-15 15:16:18 -070019import io.atomix.resource.AbstractResource;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.resource.ResourceTypeInfo;
21
Madan Jampanid7ff34d2016-05-26 18:20:55 -070022import java.util.Collection;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080023import java.util.List;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080024import java.util.Map;
Madan Jampani65f24bb2016-03-15 15:16:18 -070025import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080026import java.util.Set;
27import java.util.concurrent.CompletableFuture;
28import java.util.function.Consumer;
29
30import org.onosproject.cluster.Leadership;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.event.Change;
Madan Jampani27a2a3d2016-03-03 08:46:24 -080033import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
34import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
35import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
36import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
37import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
38import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
39import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
40import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
41import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080042import org.onosproject.store.service.AsyncLeaderElector;
43
Madan Jampanid7ff34d2016-05-26 18:20:55 -070044import com.google.common.collect.ImmutableSet;
Madan Jampani630e7ac2016-05-31 11:34:05 -070045import com.google.common.cache.CacheBuilder;
46import com.google.common.cache.CacheLoader;
47import com.google.common.cache.LoadingCache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080048import com.google.common.collect.Sets;
49
50/**
51 * Distributed resource providing the {@link AsyncLeaderElector} primitive.
52 */
Madan Jampani966a5852016-04-01 10:39:57 -070053@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
Madan Jampani65f24bb2016-03-15 15:16:18 -070054public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
Madan Jampanifc981772016-02-16 09:46:42 -080055 implements AsyncLeaderElector {
Madan Jampanid7ff34d2016-05-26 18:20:55 -070056 private final Set<Consumer<Status>> statusChangeListeners =
57 Sets.newCopyOnWriteArraySet();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080058 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Madan Jampanid7ff34d2016-05-26 18:20:55 -070059 Sets.newCopyOnWriteArraySet();
Madan Jampani630e7ac2016-05-31 11:34:05 -070060 private final Consumer<Change<Leadership>> cacheUpdater;
61 private final Consumer<Status> statusListener;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080062
Madan Jampanic94b4852016-02-23 18:18:37 -080063 public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
Madan Jampani630e7ac2016-05-31 11:34:05 -070064 private final LoadingCache<String, CompletableFuture<Leadership>> cache;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080065
Madan Jampani65f24bb2016-03-15 15:16:18 -070066 public AtomixLeaderElector(CopycatClient client, Properties properties) {
67 super(client, properties);
Madan Jampani630e7ac2016-05-31 11:34:05 -070068 cache = CacheBuilder.newBuilder()
69 .maximumSize(1000)
70 .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
71
72 cacheUpdater = change -> {
73 Leadership leadership = change.newValue();
74 cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
75 };
76 statusListener = status -> {
77 if (status == Status.SUSPENDED || status == Status.INACTIVE) {
78 cache.invalidateAll();
79 }
80 };
81 addStatusChangeListener(statusListener);
82 }
83
84 @Override
85 public CompletableFuture<Void> destroy() {
86 removeStatusChangeListener(statusListener);
87 return removeChangeListener(cacheUpdater);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088 }
89
90 @Override
91 public String name() {
92 return null;
93 }
94
95 @Override
96 public CompletableFuture<AtomixLeaderElector> open() {
97 return super.open().thenApply(result -> {
Madan Jampani0c0cdc62016-02-22 16:54:06 -080098 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 return result;
100 });
101 }
102
Madan Jampani630e7ac2016-05-31 11:34:05 -0700103 public CompletableFuture<AtomixLeaderElector> setupCache() {
104 return addChangeListener(cacheUpdater).thenApply(v -> this);
105 }
106
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800107 private void handleEvent(List<Change<Leadership>> changes) {
108 changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 }
110
111 @Override
112 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700113 return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800114 }
115
116 @Override
117 public CompletableFuture<Void> withdraw(String topic) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700118 return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 }
120
121 @Override
122 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700123 return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800124 }
125
126 @Override
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800127 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700128 return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800129 }
130
131 @Override
132 public CompletableFuture<Void> evict(NodeId nodeId) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700133 return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800134 }
135
136 @Override
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800137 public CompletableFuture<Leadership> getLeadership(String topic) {
Madan Jampani77012442016-06-02 07:47:42 -0700138 return cache.getUnchecked(topic)
139 .whenComplete((r, e) -> {
140 if (e != null) {
141 cache.invalidate(topic);
142 }
143 });
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800144 }
145
146 @Override
147 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700148 return client.submit(new GetAllLeaderships());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800149 }
150
151 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700152 return client.submit(new GetElectedTopics(nodeId));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 }
154
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800155 @Override
156 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
157 if (leadershipChangeListeners.isEmpty()) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700158 return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800159 } else {
160 leadershipChangeListeners.add(consumer);
161 return CompletableFuture.completedFuture(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800162 }
163 }
164
165 @Override
Madan Jampani27a2a3d2016-03-03 08:46:24 -0800166 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
Madan Jampani68b1f5a2016-04-05 19:17:58 -0700167 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700168 return client.submit(new Unlisten()).thenApply(v -> null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800169 }
170 return CompletableFuture.completedFuture(null);
171 }
Madan Jampanid7ff34d2016-05-26 18:20:55 -0700172
173 @Override
174 public void addStatusChangeListener(Consumer<Status> listener) {
175 statusChangeListeners.add(listener);
176 }
177
178 @Override
179 public void removeStatusChangeListener(Consumer<Status> listener) {
180 statusChangeListeners.remove(listener);
181 }
182
183 @Override
184 public Collection<Consumer<Status>> statusChangeListeners() {
185 return ImmutableSet.copyOf(statusChangeListeners);
186 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800187}