blob: 542c3755ec4d3aca8c75ce80191340362beb67d7 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import java.util.Arrays;
19import java.util.HashMap;
20import java.util.LinkedHashMap;
21import java.util.LinkedList;
22import java.util.List;
23import java.util.Map;
24import java.util.Optional;
25import java.util.Set;
26import java.util.concurrent.atomic.AtomicLong;
27import java.util.function.Supplier;
28import java.util.stream.Collectors;
29
Jordan Halterman2bf177c2017-06-29 01:49:08 -070030import com.google.common.base.MoreObjects;
31import com.google.common.base.Objects;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070032import com.google.common.collect.ImmutableSet;
33import com.google.common.collect.Lists;
34import com.google.common.collect.Maps;
35import com.google.common.collect.Sets;
36import io.atomix.protocols.raft.service.AbstractRaftService;
37import io.atomix.protocols.raft.service.Commit;
38import io.atomix.protocols.raft.service.RaftServiceExecutor;
39import io.atomix.protocols.raft.session.RaftSession;
40import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
42import org.onlab.util.KryoNamespace;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080043import org.onosproject.cluster.Leader;
44import org.onosproject.cluster.Leadership;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.event.Change;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Anoint;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Evict;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetElectedTopics;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetLeadership;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Promote;
52import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Run;
53import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080054import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055
Jordan Halterman2bf177c2017-06-29 01:49:08 -070056import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents.CHANGE;
57import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ADD_LISTENER;
58import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ANOINT;
59import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.EVICT;
60import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ALL_LEADERSHIPS;
61import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ELECTED_TOPICS;
62import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
63import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.PROMOTE;
64import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.REMOVE_LISTENER;
65import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
66import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.WITHDRAW;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080067
68/**
69 * State machine for {@link AtomixLeaderElector} resource.
70 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070071public class AtomixLeaderElectorService extends AbstractRaftService {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072
Jordan Halterman2bf177c2017-06-29 01:49:08 -070073 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
74 .register(AtomixLeaderElectorOperations.NAMESPACE)
75 .register(AtomixLeaderElectorEvents.NAMESPACE)
76 .register(ElectionState.class)
77 .register(Registration.class)
78 .register(new LinkedHashMap<>().keySet().getClass())
79 .build());
80
Madan Jampani5e5b3d62016-02-01 16:03:33 -080081 private Map<String, AtomicLong> termCounters = new HashMap<>();
82 private Map<String, ElectionState> elections = new HashMap<>();
Jordan Halterman2bf177c2017-06-29 01:49:08 -070083 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084
Jordan Halterman2bf177c2017-06-29 01:49:08 -070085 @Override
86 public void snapshot(SnapshotWriter writer) {
87 writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
88 writer.writeObject(termCounters, SERIALIZER::encode);
89 writer.writeObject(elections, SERIALIZER::encode);
Jordan Halterman6807c8f2017-08-28 20:58:24 -070090 logger().debug("Took state machine snapshot");
Madan Jampani86cb2432016-02-17 11:07:56 -080091 }
92
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -070094 public void install(SnapshotReader reader) {
95 listeners = new LinkedHashMap<>();
96 for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -070097 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -070098 }
99 termCounters = reader.readObject(SERIALIZER::decode);
100 elections = reader.readObject(SERIALIZER::decode);
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700101 logger().debug("Reinstated state machine from snapshot");
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700102 }
103
104 @Override
105 protected void configure(RaftServiceExecutor executor) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800106 // Notification
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700107 executor.register(ADD_LISTENER, this::listen);
108 executor.register(REMOVE_LISTENER, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800109 // Commands
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700110 executor.register(RUN, SERIALIZER::decode, this::run, SERIALIZER::encode);
111 executor.register(WITHDRAW, SERIALIZER::decode, this::withdraw);
112 executor.register(ANOINT, SERIALIZER::decode, this::anoint, SERIALIZER::encode);
113 executor.register(PROMOTE, SERIALIZER::decode, this::promote, SERIALIZER::encode);
114 executor.register(EVICT, SERIALIZER::decode, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800115 // Queries
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700116 executor.register(GET_LEADERSHIP, SERIALIZER::decode, this::getLeadership, SERIALIZER::encode);
117 executor.register(GET_ALL_LEADERSHIPS, this::allLeaderships, SERIALIZER::encode);
118 executor.register(GET_ELECTED_TOPICS, SERIALIZER::decode, this::electedTopics, SERIALIZER::encode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 }
120
121 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800122 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800123 }
124
125 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
126 if (changes.isEmpty()) {
127 return;
128 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700129 listeners.values().forEach(session -> session.publish(CHANGE, SERIALIZER::encode, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800130 }
131
132 /**
133 * Applies listen commits.
134 *
135 * @param commit listen commit
136 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700137 public void listen(Commit<Void> commit) {
138 listeners.put(commit.session().sessionId().id(), commit.session());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 }
140
141 /**
142 * Applies unlisten commits.
143 *
144 * @param commit unlisten commit
145 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700146 public void unlisten(Commit<Void> commit) {
147 listeners.remove(commit.session().sessionId().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800148 }
149
150 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700151 * Applies an {@link AtomixLeaderElectorOperations.Run} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 * @param commit commit entry
153 * @return topic leader. If no previous leader existed this is the node that just entered the race.
154 */
Madan Jampanifc981772016-02-16 09:46:42 -0800155 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800156 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700157 String topic = commit.value().topic();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800158 Leadership oldLeadership = leadership(topic);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700159 Registration registration = new Registration(commit.value().nodeId(), commit.session().sessionId().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800160 elections.compute(topic, (k, v) -> {
161 if (v == null) {
Jordan Halterman48d75d62018-02-15 22:24:57 -0800162 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800163 } else {
164 if (!v.isDuplicate(registration)) {
Jordan Halterman48d75d62018-02-15 22:24:57 -0800165 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 } else {
167 return v;
168 }
169 }
170 });
171 Leadership newLeadership = leadership(topic);
172
173 if (!Objects.equal(oldLeadership, newLeadership)) {
174 notifyLeadershipChange(oldLeadership, newLeadership);
175 }
176 return newLeadership;
Madan Jampaniea98f412016-06-22 09:05:40 -0700177 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700178 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800179 throw new IllegalStateException(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800180 }
181 }
182
183 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700184 * Applies an {@link AtomixLeaderElectorOperations.Withdraw} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800185 * @param commit withdraw commit
186 */
Madan Jampanifc981772016-02-16 09:46:42 -0800187 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800188 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700189 String topic = commit.value().topic();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800190 Leadership oldLeadership = leadership(topic);
Jordan Halterman48d75d62018-02-15 22:24:57 -0800191 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
192 termCounter(topic)::incrementAndGet));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800193 Leadership newLeadership = leadership(topic);
194 if (!Objects.equal(oldLeadership, newLeadership)) {
195 notifyLeadershipChange(oldLeadership, newLeadership);
196 }
Madan Jampaniea98f412016-06-22 09:05:40 -0700197 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700198 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800199 throw new IllegalStateException(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800200 }
201 }
202
203 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700204 * Applies an {@link AtomixLeaderElectorOperations.Anoint} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800205 * @param commit anoint commit
206 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
207 */
Madan Jampanifc981772016-02-16 09:46:42 -0800208 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700210 String topic = commit.value().topic();
211 NodeId nodeId = commit.value().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800212 Leadership oldLeadership = leadership(topic);
213 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800214 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800215 Leadership newLeadership = leadership(topic);
216 if (!Objects.equal(oldLeadership, newLeadership)) {
217 notifyLeadershipChange(oldLeadership, newLeadership);
218 }
219 return (electionState != null &&
220 electionState.leader() != null &&
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700221 commit.value().nodeId().equals(electionState.leader().nodeId()));
Madan Jampaniea98f412016-06-22 09:05:40 -0700222 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700223 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800224 throw new IllegalStateException(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800225 }
226 }
227
228 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700229 * Applies an {@link AtomixLeaderElectorOperations.Promote} commit.
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800230 * @param commit promote commit
231 * @return {@code true} if changes desired end state is achieved.
232 */
233 public boolean promote(Commit<? extends Promote> commit) {
234 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700235 String topic = commit.value().topic();
236 NodeId nodeId = commit.value().nodeId();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800237 Leadership oldLeadership = leadership(topic);
238 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
239 return false;
240 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800241 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800242 Leadership newLeadership = leadership(topic);
243 if (!Objects.equal(oldLeadership, newLeadership)) {
244 notifyLeadershipChange(oldLeadership, newLeadership);
245 }
246 return true;
Madan Jampaniea98f412016-06-22 09:05:40 -0700247 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700248 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800249 throw new IllegalStateException(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800250 }
251 }
252
253 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700254 * Applies an {@link AtomixLeaderElectorOperations.Evict} commit.
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800255 * @param commit evict commit
256 */
257 public void evict(Commit<? extends Evict> commit) {
258 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800259 List<Change<Leadership>> changes = Lists.newArrayList();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700260 NodeId nodeId = commit.value().nodeId();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800261 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
262 topics.forEach(topic -> {
263 Leadership oldLeadership = leadership(topic);
264 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
265 Leadership newLeadership = leadership(topic);
266 if (!Objects.equal(oldLeadership, newLeadership)) {
267 changes.add(new Change<>(oldLeadership, newLeadership));
268 }
269 });
270 notifyLeadershipChanges(changes);
Madan Jampaniea98f412016-06-22 09:05:40 -0700271 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700272 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800273 throw new IllegalStateException(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800274 }
275 }
276
277 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700278 * Applies an {@link AtomixLeaderElectorOperations.GetLeadership} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800279 * @param commit GetLeadership commit
280 * @return leader
281 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700282 public Leadership getLeadership(Commit<? extends GetLeadership> commit) {
283 String topic = commit.value().topic();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800284 try {
285 return leadership(topic);
Madan Jampaniea98f412016-06-22 09:05:40 -0700286 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700287 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800288 throw new IllegalStateException(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800289 }
290 }
291
292 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700293 * Applies an {@link AtomixLeaderElectorOperations.GetElectedTopics} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800294 * @param commit commit entry
295 * @return set of topics for which the node is the leader
296 */
Madan Jampanifc981772016-02-16 09:46:42 -0800297 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800298 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700299 NodeId nodeId = commit.value().nodeId();
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700300 return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800301 Leader leader = leadership(e.getKey()).leader();
302 return leader != null && leader.nodeId().equals(nodeId);
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700303 }).keySet());
Madan Jampaniea98f412016-06-22 09:05:40 -0700304 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700305 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800306 throw new IllegalStateException(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800307 }
308 }
309
310 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700311 * Applies an {@link AtomixLeaderElectorOperations#GET_ALL_LEADERSHIPS} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800312 * @param commit GetAllLeaderships commit
313 * @return topic to leader mapping
314 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700315 public Map<String, Leadership> allLeaderships(Commit<Void> commit) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700316 Map<String, Leadership> result = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800317 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700318 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
319 return result;
Madan Jampaniea98f412016-06-22 09:05:40 -0700320 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700321 logger().error("State machine operation failed", e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800322 throw new IllegalStateException(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800323 }
324 }
325
326 private Leadership leadership(String topic) {
327 return new Leadership(topic,
328 leader(topic),
329 candidates(topic));
330 }
331
332 private Leader leader(String topic) {
333 ElectionState electionState = elections.get(topic);
334 return electionState == null ? null : electionState.leader();
335 }
336
337 private List<NodeId> candidates(String topic) {
338 ElectionState electionState = elections.get(topic);
339 return electionState == null ? new LinkedList<>() : electionState.candidates();
340 }
341
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700342 private void onSessionEnd(RaftSession session) {
343 listeners.remove(session.sessionId().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800344 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800345 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800346 topics.forEach(topic -> {
347 Leadership oldLeadership = leadership(topic);
Jordan Halterman48d75d62018-02-15 22:24:57 -0800348 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800349 Leadership newLeadership = leadership(topic);
350 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800351 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800352 }
353 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800354 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800355 }
356
357 private static class Registration {
358 private final NodeId nodeId;
359 private final long sessionId;
360
361 public Registration(NodeId nodeId, long sessionId) {
362 this.nodeId = nodeId;
363 this.sessionId = sessionId;
364 }
365
366 public NodeId nodeId() {
367 return nodeId;
368 }
369
370 public long sessionId() {
371 return sessionId;
372 }
373
374 @Override
375 public String toString() {
376 return MoreObjects.toStringHelper(getClass())
377 .add("nodeId", nodeId)
378 .add("sessionId", sessionId)
379 .toString();
380 }
381 }
382
Jordan Halterman48d75d62018-02-15 22:24:57 -0800383 private static class ElectionState {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800384 final Registration leader;
385 final long term;
386 final long termStartTime;
387 final List<Registration> registrations;
388
Jordan Halterman48d75d62018-02-15 22:24:57 -0800389 public ElectionState(Registration registration, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800390 registrations = Arrays.asList(registration);
391 term = termCounter.get();
392 termStartTime = System.currentTimeMillis();
393 leader = registration;
394 }
395
396 public ElectionState(ElectionState other) {
397 registrations = Lists.newArrayList(other.registrations);
398 leader = other.leader;
399 term = other.term;
400 termStartTime = other.termStartTime;
401 }
402
403 public ElectionState(List<Registration> registrations,
404 Registration leader,
405 long term,
Jordan Halterman48d75d62018-02-15 22:24:57 -0800406 long termStartTime) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800407 this.registrations = Lists.newArrayList(registrations);
408 this.leader = leader;
409 this.term = term;
410 this.termStartTime = termStartTime;
411 }
412
Jordan Halterman48d75d62018-02-15 22:24:57 -0800413 public ElectionState cleanup(RaftSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800414 Optional<Registration> registration =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700415 registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800416 if (registration.isPresent()) {
417 List<Registration> updatedRegistrations =
418 registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700419 .filter(r -> r.sessionId() != session.sessionId().id())
420 .collect(Collectors.toList());
421 if (leader.sessionId() == session.sessionId().id()) {
Jon Hallcbd1b392017-01-18 20:15:44 -0800422 if (!updatedRegistrations.isEmpty()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800423 return new ElectionState(updatedRegistrations,
424 updatedRegistrations.get(0),
425 termCounter.get(),
Jordan Halterman48d75d62018-02-15 22:24:57 -0800426 System.currentTimeMillis());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800427 } else {
Jordan Halterman48d75d62018-02-15 22:24:57 -0800428 return new ElectionState(updatedRegistrations, null, term, termStartTime);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800429 }
430 } else {
Jordan Halterman48d75d62018-02-15 22:24:57 -0800431 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800432 }
433 } else {
434 return this;
435 }
436 }
437
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800438 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
439 Optional<Registration> registration =
440 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
441 if (registration.isPresent()) {
442 List<Registration> updatedRegistrations =
443 registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700444 .filter(r -> !r.nodeId().equals(nodeId))
445 .collect(Collectors.toList());
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800446 if (leader.nodeId().equals(nodeId)) {
Jon Hallcbd1b392017-01-18 20:15:44 -0800447 if (!updatedRegistrations.isEmpty()) {
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800448 return new ElectionState(updatedRegistrations,
449 updatedRegistrations.get(0),
450 termCounter.get(),
Jordan Halterman48d75d62018-02-15 22:24:57 -0800451 System.currentTimeMillis());
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800452 } else {
Jordan Halterman48d75d62018-02-15 22:24:57 -0800453 return new ElectionState(updatedRegistrations, null, term, termStartTime);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800454 }
455 } else {
Jordan Halterman48d75d62018-02-15 22:24:57 -0800456 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800457 }
458 } else {
459 return this;
460 }
461 }
462
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800463 public boolean isDuplicate(Registration registration) {
464 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
465 }
466
467 public Leader leader() {
468 if (leader == null) {
469 return null;
470 } else {
471 NodeId leaderNodeId = leader.nodeId();
472 return new Leader(leaderNodeId, term, termStartTime);
473 }
474 }
475
476 public List<NodeId> candidates() {
477 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
478 }
479
Jordan Halterman48d75d62018-02-15 22:24:57 -0800480 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800481 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
482 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
483 updatedRegistrations.add(registration);
Jordan Halterman48d75d62018-02-15 22:24:57 -0800484 boolean newLeader = leader == null;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800485 return new ElectionState(updatedRegistrations,
Jordan Halterman48d75d62018-02-15 22:24:57 -0800486 newLeader ? registration : leader,
487 newLeader ? termCounter.get() : term,
488 newLeader ? System.currentTimeMillis() : termStartTime);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800489 }
490 return this;
491 }
492
493 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
494 Registration newLeader = registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700495 .filter(r -> r.nodeId().equals(nodeId))
496 .findFirst()
497 .orElse(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800498 if (newLeader != null) {
499 return new ElectionState(registrations,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700500 newLeader,
501 termCounter.incrementAndGet(),
Jordan Halterman48d75d62018-02-15 22:24:57 -0800502 System.currentTimeMillis());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800503 } else {
504 return this;
505 }
506 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800507
508 public ElectionState promote(NodeId nodeId) {
509 Registration registration = registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700510 .filter(r -> r.nodeId().equals(nodeId))
511 .findFirst()
512 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800513 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800514 updatedRegistrations.add(registration);
515 registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700516 .filter(r -> !r.nodeId().equals(nodeId))
517 .forEach(updatedRegistrations::add);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800518 return new ElectionState(updatedRegistrations,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700519 leader,
520 term,
Jordan Halterman48d75d62018-02-15 22:24:57 -0800521 termStartTime);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800522
523 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800524 }
525
526 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700527 public void onExpire(RaftSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800528 onSessionEnd(session);
529 }
530
531 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700532 public void onClose(RaftSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800533 onSessionEnd(session);
534 }
535
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800536 private AtomicLong termCounter(String topic) {
537 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
538 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700539}