blob: 369c191bcffdea42c546c6e516fd39cecaa166e5 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070019
20import com.google.common.collect.ImmutableSet;
Madan Jampani3a9911c2016-02-21 11:25:45 -080021import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Arrays;
31import java.util.HashMap;
32import java.util.LinkedHashMap;
33import java.util.LinkedList;
34import java.util.List;
35import java.util.Map;
36import java.util.Optional;
Madan Jampani65f24bb2016-03-15 15:16:18 -070037import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080038import java.util.Set;
39import java.util.concurrent.atomic.AtomicLong;
40import java.util.function.Supplier;
41import java.util.stream.Collectors;
42
43import org.onosproject.cluster.Leader;
44import org.onosproject.cluster.Leadership;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080048import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080049import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
52import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080053import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080054import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
55import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
56import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.Serializer;
59import org.slf4j.Logger;
60
61import com.google.common.base.MoreObjects;
62import com.google.common.base.Objects;
Madan Jampaniea98f412016-06-22 09:05:40 -070063import com.google.common.base.Throwables;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import com.google.common.collect.Lists;
65import com.google.common.collect.Maps;
66
67/**
68 * State machine for {@link AtomixLeaderElector} resource.
69 */
70public class AtomixLeaderElectorState extends ResourceStateMachine
71 implements SessionListener, Snapshottable {
72
73 private final Logger log = getLogger(getClass());
74 private Map<String, AtomicLong> termCounters = new HashMap<>();
75 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080076 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080077 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
78 ElectionState.class,
79 Registration.class);
80
Madan Jampani65f24bb2016-03-15 15:16:18 -070081 public AtomixLeaderElectorState(Properties properties) {
82 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080083 }
84
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 @Override
86 protected void configure(StateMachineExecutor executor) {
87 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080088 executor.register(Listen.class, this::listen);
89 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080090 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080091 executor.register(Run.class, this::run);
92 executor.register(Withdraw.class, this::withdraw);
93 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080094 executor.register(Promote.class, this::promote);
95 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080096 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080097 executor.register(GetLeadership.class, this::leadership);
98 executor.register(GetAllLeaderships.class, this::allLeaderships);
99 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 }
101
102 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800103 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800104 }
105
106 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
107 if (changes.isEmpty()) {
108 return;
109 }
110 listeners.values()
111 .forEach(listener -> listener.session()
112 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 }
114
115 @Override
116 public void delete() {
117 // Close and clear Listeners
118 listeners.values().forEach(Commit::close);
119 listeners.clear();
120 }
121
122 /**
123 * Applies listen commits.
124 *
125 * @param commit listen commit
126 */
Madan Jampanifc981772016-02-16 09:46:42 -0800127 public void listen(Commit<? extends Listen> commit) {
Deepa Vaddireddy289ff7f2017-01-04 09:34:23 +0530128 Long sessionId = commit.session().id();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800129 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
130 commit.close();
131 }
Deepa Vaddireddy289ff7f2017-01-04 09:34:23 +0530132 commit.session()
133 .onStateChange(
134 state -> {
135 if (state == ServerSession.State.CLOSED
136 || state == ServerSession.State.EXPIRED) {
137 Commit<? extends Listen> listener = listeners.remove(sessionId);
138 if (listener != null) {
139 listener.close();
140 }
141 }
142 });
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800143 }
144
145 /**
146 * Applies unlisten commits.
147 *
148 * @param commit unlisten commit
149 */
Madan Jampanifc981772016-02-16 09:46:42 -0800150 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800151 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800152 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 if (listener != null) {
154 listener.close();
155 }
156 } finally {
157 commit.close();
158 }
159 }
160
161 /**
162 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
163 * @param commit commit entry
164 * @return topic leader. If no previous leader existed this is the node that just entered the race.
165 */
Madan Jampanifc981772016-02-16 09:46:42 -0800166 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800167 try {
168 String topic = commit.operation().topic();
169 Leadership oldLeadership = leadership(topic);
170 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
171 elections.compute(topic, (k, v) -> {
172 if (v == null) {
173 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
174 } else {
175 if (!v.isDuplicate(registration)) {
176 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
177 } else {
178 return v;
179 }
180 }
181 });
182 Leadership newLeadership = leadership(topic);
183
184 if (!Objects.equal(oldLeadership, newLeadership)) {
185 notifyLeadershipChange(oldLeadership, newLeadership);
186 }
187 return newLeadership;
Madan Jampaniea98f412016-06-22 09:05:40 -0700188 } catch (Exception e) {
189 log.error("State machine operation failed", e);
190 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 } finally {
192 commit.close();
193 }
194 }
195
196 /**
197 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
198 * @param commit withdraw commit
199 */
Madan Jampanifc981772016-02-16 09:46:42 -0800200 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800201 try {
202 String topic = commit.operation().topic();
203 Leadership oldLeadership = leadership(topic);
204 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
205 termCounter(topic)::incrementAndGet));
206 Leadership newLeadership = leadership(topic);
207 if (!Objects.equal(oldLeadership, newLeadership)) {
208 notifyLeadershipChange(oldLeadership, newLeadership);
209 }
Madan Jampaniea98f412016-06-22 09:05:40 -0700210 } catch (Exception e) {
211 log.error("State machine operation failed", e);
212 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800213 } finally {
214 commit.close();
215 }
216 }
217
218 /**
219 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
220 * @param commit anoint commit
221 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
222 */
Madan Jampanifc981772016-02-16 09:46:42 -0800223 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800224 try {
225 String topic = commit.operation().topic();
Madan Jampanic94b4852016-02-23 18:18:37 -0800226 NodeId nodeId = commit.operation().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800227 Leadership oldLeadership = leadership(topic);
228 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800229 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800230 Leadership newLeadership = leadership(topic);
231 if (!Objects.equal(oldLeadership, newLeadership)) {
232 notifyLeadershipChange(oldLeadership, newLeadership);
233 }
234 return (electionState != null &&
235 electionState.leader() != null &&
236 commit.operation().nodeId().equals(electionState.leader().nodeId()));
Madan Jampaniea98f412016-06-22 09:05:40 -0700237 } catch (Exception e) {
238 log.error("State machine operation failed", e);
239 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800240 } finally {
241 commit.close();
242 }
243 }
244
245 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800246 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
247 * @param commit promote commit
248 * @return {@code true} if changes desired end state is achieved.
249 */
250 public boolean promote(Commit<? extends Promote> commit) {
251 try {
252 String topic = commit.operation().topic();
253 NodeId nodeId = commit.operation().nodeId();
254 Leadership oldLeadership = leadership(topic);
255 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
256 return false;
257 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800258 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800259 Leadership newLeadership = leadership(topic);
260 if (!Objects.equal(oldLeadership, newLeadership)) {
261 notifyLeadershipChange(oldLeadership, newLeadership);
262 }
263 return true;
Madan Jampaniea98f412016-06-22 09:05:40 -0700264 } catch (Exception e) {
265 log.error("State machine operation failed", e);
266 throw Throwables.propagate(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800267 } finally {
268 commit.close();
269 }
270 }
271
272 /**
273 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
274 * @param commit evict commit
275 */
276 public void evict(Commit<? extends Evict> commit) {
277 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800278 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800279 NodeId nodeId = commit.operation().nodeId();
280 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
281 topics.forEach(topic -> {
282 Leadership oldLeadership = leadership(topic);
283 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
284 Leadership newLeadership = leadership(topic);
285 if (!Objects.equal(oldLeadership, newLeadership)) {
286 changes.add(new Change<>(oldLeadership, newLeadership));
287 }
288 });
289 notifyLeadershipChanges(changes);
Madan Jampaniea98f412016-06-22 09:05:40 -0700290 } catch (Exception e) {
291 log.error("State machine operation failed", e);
292 throw Throwables.propagate(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800293 } finally {
294 commit.close();
295 }
296 }
297
298 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800299 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
300 * @param commit GetLeadership commit
301 * @return leader
302 */
Madan Jampanifc981772016-02-16 09:46:42 -0800303 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800304 String topic = commit.operation().topic();
305 try {
306 return leadership(topic);
Madan Jampaniea98f412016-06-22 09:05:40 -0700307 } catch (Exception e) {
308 log.error("State machine operation failed", e);
309 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800310 } finally {
311 commit.close();
312 }
313 }
314
315 /**
316 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
317 * @param commit commit entry
318 * @return set of topics for which the node is the leader
319 */
Madan Jampanifc981772016-02-16 09:46:42 -0800320 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800321 try {
322 NodeId nodeId = commit.operation().nodeId();
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700323 return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800324 Leader leader = leadership(e.getKey()).leader();
325 return leader != null && leader.nodeId().equals(nodeId);
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700326 }).keySet());
Madan Jampaniea98f412016-06-22 09:05:40 -0700327 } catch (Exception e) {
328 log.error("State machine operation failed", e);
329 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800330 } finally {
331 commit.close();
332 }
333 }
334
335 /**
336 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
337 * @param commit GetAllLeaderships commit
338 * @return topic to leader mapping
339 */
Madan Jampanifc981772016-02-16 09:46:42 -0800340 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700341 Map<String, Leadership> result = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800342 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700343 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
344 return result;
Madan Jampaniea98f412016-06-22 09:05:40 -0700345 } catch (Exception e) {
346 log.error("State machine operation failed", e);
347 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348 } finally {
349 commit.close();
350 }
351 }
352
353 private Leadership leadership(String topic) {
354 return new Leadership(topic,
355 leader(topic),
356 candidates(topic));
357 }
358
359 private Leader leader(String topic) {
360 ElectionState electionState = elections.get(topic);
361 return electionState == null ? null : electionState.leader();
362 }
363
364 private List<NodeId> candidates(String topic) {
365 ElectionState electionState = elections.get(topic);
366 return electionState == null ? new LinkedList<>() : electionState.candidates();
367 }
368
Madan Jampani3a9911c2016-02-21 11:25:45 -0800369 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800370 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800371 if (listener != null) {
372 listener.close();
373 }
374 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800375 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800376 topics.forEach(topic -> {
377 Leadership oldLeadership = leadership(topic);
378 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
379 Leadership newLeadership = leadership(topic);
380 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800381 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800382 }
383 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800384 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800385 }
386
387 private static class Registration {
388 private final NodeId nodeId;
389 private final long sessionId;
390
391 public Registration(NodeId nodeId, long sessionId) {
392 this.nodeId = nodeId;
393 this.sessionId = sessionId;
394 }
395
396 public NodeId nodeId() {
397 return nodeId;
398 }
399
400 public long sessionId() {
401 return sessionId;
402 }
403
404 @Override
405 public String toString() {
406 return MoreObjects.toStringHelper(getClass())
407 .add("nodeId", nodeId)
408 .add("sessionId", sessionId)
409 .toString();
410 }
411 }
412
413 private static class ElectionState {
414 final Registration leader;
415 final long term;
416 final long termStartTime;
417 final List<Registration> registrations;
418
419 public ElectionState(Registration registration, Supplier<Long> termCounter) {
420 registrations = Arrays.asList(registration);
421 term = termCounter.get();
422 termStartTime = System.currentTimeMillis();
423 leader = registration;
424 }
425
426 public ElectionState(ElectionState other) {
427 registrations = Lists.newArrayList(other.registrations);
428 leader = other.leader;
429 term = other.term;
430 termStartTime = other.termStartTime;
431 }
432
433 public ElectionState(List<Registration> registrations,
434 Registration leader,
435 long term,
436 long termStartTime) {
437 this.registrations = Lists.newArrayList(registrations);
438 this.leader = leader;
439 this.term = term;
440 this.termStartTime = termStartTime;
441 }
442
Madan Jampani3a9911c2016-02-21 11:25:45 -0800443 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800444 Optional<Registration> registration =
445 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
446 if (registration.isPresent()) {
447 List<Registration> updatedRegistrations =
448 registrations.stream()
449 .filter(r -> r.sessionId() != session.id())
450 .collect(Collectors.toList());
451 if (leader.sessionId() == session.id()) {
Jon Hallcbd1b392017-01-18 20:15:44 -0800452 if (!updatedRegistrations.isEmpty()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800453 return new ElectionState(updatedRegistrations,
454 updatedRegistrations.get(0),
455 termCounter.get(),
456 System.currentTimeMillis());
457 } else {
458 return new ElectionState(updatedRegistrations, null, term, termStartTime);
459 }
460 } else {
461 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
462 }
463 } else {
464 return this;
465 }
466 }
467
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800468 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
469 Optional<Registration> registration =
470 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
471 if (registration.isPresent()) {
472 List<Registration> updatedRegistrations =
473 registrations.stream()
474 .filter(r -> !r.nodeId().equals(nodeId))
475 .collect(Collectors.toList());
476 if (leader.nodeId().equals(nodeId)) {
Jon Hallcbd1b392017-01-18 20:15:44 -0800477 if (!updatedRegistrations.isEmpty()) {
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800478 return new ElectionState(updatedRegistrations,
479 updatedRegistrations.get(0),
480 termCounter.get(),
481 System.currentTimeMillis());
482 } else {
483 return new ElectionState(updatedRegistrations, null, term, termStartTime);
484 }
485 } else {
486 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
487 }
488 } else {
489 return this;
490 }
491 }
492
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800493 public boolean isDuplicate(Registration registration) {
494 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
495 }
496
497 public Leader leader() {
498 if (leader == null) {
499 return null;
500 } else {
501 NodeId leaderNodeId = leader.nodeId();
502 return new Leader(leaderNodeId, term, termStartTime);
503 }
504 }
505
506 public List<NodeId> candidates() {
507 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
508 }
509
510 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
511 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
512 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
513 updatedRegistrations.add(registration);
514 boolean newLeader = leader == null;
515 return new ElectionState(updatedRegistrations,
516 newLeader ? registration : leader,
517 newLeader ? termCounter.get() : term,
518 newLeader ? System.currentTimeMillis() : termStartTime);
519 }
520 return this;
521 }
522
523 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
524 Registration newLeader = registrations.stream()
525 .filter(r -> r.nodeId().equals(nodeId))
526 .findFirst()
527 .orElse(null);
528 if (newLeader != null) {
529 return new ElectionState(registrations,
530 newLeader,
531 termCounter.incrementAndGet(),
532 System.currentTimeMillis());
533 } else {
534 return this;
535 }
536 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800537
538 public ElectionState promote(NodeId nodeId) {
539 Registration registration = registrations.stream()
540 .filter(r -> r.nodeId().equals(nodeId))
541 .findFirst()
542 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800543 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800544 updatedRegistrations.add(registration);
545 registrations.stream()
546 .filter(r -> !r.nodeId().equals(nodeId))
547 .forEach(updatedRegistrations::add);
548 return new ElectionState(updatedRegistrations,
549 leader,
550 term,
551 termStartTime);
552
553 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800554 }
555
556 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800557 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800558 }
559
560 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800561 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800562 onSessionEnd(session);
563 }
564
565 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800566 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800567 onSessionEnd(session);
568 }
569
570 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800571 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800572 onSessionEnd(session);
573 }
574
575 @Override
576 public void snapshot(SnapshotWriter writer) {
577 byte[] encodedTermCounters = serializer.encode(termCounters);
578 writer.writeInt(encodedTermCounters.length);
579 writer.write(encodedTermCounters);
580 byte[] encodedElections = serializer.encode(elections);
581 writer.writeInt(encodedElections.length);
582 writer.write(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700583 log.debug("Took state machine snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800584 }
585
586 @Override
587 public void install(SnapshotReader reader) {
588 int encodedTermCountersSize = reader.readInt();
589 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
590 reader.read(encodedTermCounters);
591 termCounters = serializer.decode(encodedTermCounters);
592 int encodedElectionsSize = reader.readInt();
593 byte[] encodedElections = new byte[encodedElectionsSize];
594 reader.read(encodedElections);
595 elections = serializer.decode(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700596 log.debug("Reinstated state machine from snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800597 }
598
599 private AtomicLong termCounter(String topic) {
600 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
601 }
602}