blob: 33414e777684c454de3692fa64e3b72797225201 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070019
20import com.google.common.collect.ImmutableSet;
Madan Jampani3a9911c2016-02-21 11:25:45 -080021import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Arrays;
31import java.util.HashMap;
32import java.util.LinkedHashMap;
33import java.util.LinkedList;
34import java.util.List;
35import java.util.Map;
36import java.util.Optional;
Madan Jampani65f24bb2016-03-15 15:16:18 -070037import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080038import java.util.Set;
39import java.util.concurrent.atomic.AtomicLong;
40import java.util.function.Supplier;
41import java.util.stream.Collectors;
42
43import org.onosproject.cluster.Leader;
44import org.onosproject.cluster.Leadership;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080048import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080049import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
52import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080053import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080054import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
55import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
56import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.Serializer;
59import org.slf4j.Logger;
60
61import com.google.common.base.MoreObjects;
62import com.google.common.base.Objects;
Madan Jampaniea98f412016-06-22 09:05:40 -070063import com.google.common.base.Throwables;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080064import com.google.common.collect.Lists;
65import com.google.common.collect.Maps;
66
67/**
68 * State machine for {@link AtomixLeaderElector} resource.
69 */
70public class AtomixLeaderElectorState extends ResourceStateMachine
71 implements SessionListener, Snapshottable {
72
73 private final Logger log = getLogger(getClass());
74 private Map<String, AtomicLong> termCounters = new HashMap<>();
75 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080076 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080077 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
78 ElectionState.class,
79 Registration.class);
80
Madan Jampani65f24bb2016-03-15 15:16:18 -070081 public AtomixLeaderElectorState(Properties properties) {
82 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080083 }
84
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 @Override
86 protected void configure(StateMachineExecutor executor) {
87 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080088 executor.register(Listen.class, this::listen);
89 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080090 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080091 executor.register(Run.class, this::run);
92 executor.register(Withdraw.class, this::withdraw);
93 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080094 executor.register(Promote.class, this::promote);
95 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080096 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080097 executor.register(GetLeadership.class, this::leadership);
98 executor.register(GetAllLeaderships.class, this::allLeaderships);
99 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800100 }
101
102 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800103 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800104 }
105
106 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
107 if (changes.isEmpty()) {
108 return;
109 }
110 listeners.values()
111 .forEach(listener -> listener.session()
112 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 }
114
115 @Override
116 public void delete() {
117 // Close and clear Listeners
118 listeners.values().forEach(Commit::close);
119 listeners.clear();
120 }
121
122 /**
123 * Applies listen commits.
124 *
125 * @param commit listen commit
126 */
Madan Jampanifc981772016-02-16 09:46:42 -0800127 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800128 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
129 commit.close();
130 }
131 }
132
133 /**
134 * Applies unlisten commits.
135 *
136 * @param commit unlisten commit
137 */
Madan Jampanifc981772016-02-16 09:46:42 -0800138 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800139 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800140 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800141 if (listener != null) {
142 listener.close();
143 }
144 } finally {
145 commit.close();
146 }
147 }
148
149 /**
150 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
151 * @param commit commit entry
152 * @return topic leader. If no previous leader existed this is the node that just entered the race.
153 */
Madan Jampanifc981772016-02-16 09:46:42 -0800154 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800155 try {
156 String topic = commit.operation().topic();
157 Leadership oldLeadership = leadership(topic);
158 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
159 elections.compute(topic, (k, v) -> {
160 if (v == null) {
161 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
162 } else {
163 if (!v.isDuplicate(registration)) {
164 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
165 } else {
166 return v;
167 }
168 }
169 });
170 Leadership newLeadership = leadership(topic);
171
172 if (!Objects.equal(oldLeadership, newLeadership)) {
173 notifyLeadershipChange(oldLeadership, newLeadership);
174 }
175 return newLeadership;
Madan Jampaniea98f412016-06-22 09:05:40 -0700176 } catch (Exception e) {
177 log.error("State machine operation failed", e);
178 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800179 } finally {
180 commit.close();
181 }
182 }
183
184 /**
185 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
186 * @param commit withdraw commit
187 */
Madan Jampanifc981772016-02-16 09:46:42 -0800188 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800189 try {
190 String topic = commit.operation().topic();
191 Leadership oldLeadership = leadership(topic);
192 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
193 termCounter(topic)::incrementAndGet));
194 Leadership newLeadership = leadership(topic);
195 if (!Objects.equal(oldLeadership, newLeadership)) {
196 notifyLeadershipChange(oldLeadership, newLeadership);
197 }
Madan Jampaniea98f412016-06-22 09:05:40 -0700198 } catch (Exception e) {
199 log.error("State machine operation failed", e);
200 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800201 } finally {
202 commit.close();
203 }
204 }
205
206 /**
207 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
208 * @param commit anoint commit
209 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
210 */
Madan Jampanifc981772016-02-16 09:46:42 -0800211 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800212 try {
213 String topic = commit.operation().topic();
Madan Jampanic94b4852016-02-23 18:18:37 -0800214 NodeId nodeId = commit.operation().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800215 Leadership oldLeadership = leadership(topic);
216 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800217 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800218 Leadership newLeadership = leadership(topic);
219 if (!Objects.equal(oldLeadership, newLeadership)) {
220 notifyLeadershipChange(oldLeadership, newLeadership);
221 }
222 return (electionState != null &&
223 electionState.leader() != null &&
224 commit.operation().nodeId().equals(electionState.leader().nodeId()));
Madan Jampaniea98f412016-06-22 09:05:40 -0700225 } catch (Exception e) {
226 log.error("State machine operation failed", e);
227 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800228 } finally {
229 commit.close();
230 }
231 }
232
233 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800234 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
235 * @param commit promote commit
236 * @return {@code true} if changes desired end state is achieved.
237 */
238 public boolean promote(Commit<? extends Promote> commit) {
239 try {
240 String topic = commit.operation().topic();
241 NodeId nodeId = commit.operation().nodeId();
242 Leadership oldLeadership = leadership(topic);
243 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
244 return false;
245 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800246 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800247 Leadership newLeadership = leadership(topic);
248 if (!Objects.equal(oldLeadership, newLeadership)) {
249 notifyLeadershipChange(oldLeadership, newLeadership);
250 }
251 return true;
Madan Jampaniea98f412016-06-22 09:05:40 -0700252 } catch (Exception e) {
253 log.error("State machine operation failed", e);
254 throw Throwables.propagate(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800255 } finally {
256 commit.close();
257 }
258 }
259
260 /**
261 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
262 * @param commit evict commit
263 */
264 public void evict(Commit<? extends Evict> commit) {
265 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800266 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800267 NodeId nodeId = commit.operation().nodeId();
268 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
269 topics.forEach(topic -> {
270 Leadership oldLeadership = leadership(topic);
271 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
272 Leadership newLeadership = leadership(topic);
273 if (!Objects.equal(oldLeadership, newLeadership)) {
274 changes.add(new Change<>(oldLeadership, newLeadership));
275 }
276 });
277 notifyLeadershipChanges(changes);
Madan Jampaniea98f412016-06-22 09:05:40 -0700278 } catch (Exception e) {
279 log.error("State machine operation failed", e);
280 throw Throwables.propagate(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800281 } finally {
282 commit.close();
283 }
284 }
285
286 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800287 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
288 * @param commit GetLeadership commit
289 * @return leader
290 */
Madan Jampanifc981772016-02-16 09:46:42 -0800291 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800292 String topic = commit.operation().topic();
293 try {
294 return leadership(topic);
Madan Jampaniea98f412016-06-22 09:05:40 -0700295 } catch (Exception e) {
296 log.error("State machine operation failed", e);
297 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800298 } finally {
299 commit.close();
300 }
301 }
302
303 /**
304 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
305 * @param commit commit entry
306 * @return set of topics for which the node is the leader
307 */
Madan Jampanifc981772016-02-16 09:46:42 -0800308 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800309 try {
310 NodeId nodeId = commit.operation().nodeId();
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700311 return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800312 Leader leader = leadership(e.getKey()).leader();
313 return leader != null && leader.nodeId().equals(nodeId);
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700314 }).keySet());
Madan Jampaniea98f412016-06-22 09:05:40 -0700315 } catch (Exception e) {
316 log.error("State machine operation failed", e);
317 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318 } finally {
319 commit.close();
320 }
321 }
322
323 /**
324 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
325 * @param commit GetAllLeaderships commit
326 * @return topic to leader mapping
327 */
Madan Jampanifc981772016-02-16 09:46:42 -0800328 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700329 Map<String, Leadership> result = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800330 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700331 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
332 return result;
Madan Jampaniea98f412016-06-22 09:05:40 -0700333 } catch (Exception e) {
334 log.error("State machine operation failed", e);
335 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800336 } finally {
337 commit.close();
338 }
339 }
340
341 private Leadership leadership(String topic) {
342 return new Leadership(topic,
343 leader(topic),
344 candidates(topic));
345 }
346
347 private Leader leader(String topic) {
348 ElectionState electionState = elections.get(topic);
349 return electionState == null ? null : electionState.leader();
350 }
351
352 private List<NodeId> candidates(String topic) {
353 ElectionState electionState = elections.get(topic);
354 return electionState == null ? new LinkedList<>() : electionState.candidates();
355 }
356
Madan Jampani3a9911c2016-02-21 11:25:45 -0800357 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800358 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800359 if (listener != null) {
360 listener.close();
361 }
362 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800363 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800364 topics.forEach(topic -> {
365 Leadership oldLeadership = leadership(topic);
366 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
367 Leadership newLeadership = leadership(topic);
368 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800369 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800370 }
371 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800372 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800373 }
374
375 private static class Registration {
376 private final NodeId nodeId;
377 private final long sessionId;
378
379 public Registration(NodeId nodeId, long sessionId) {
380 this.nodeId = nodeId;
381 this.sessionId = sessionId;
382 }
383
384 public NodeId nodeId() {
385 return nodeId;
386 }
387
388 public long sessionId() {
389 return sessionId;
390 }
391
392 @Override
393 public String toString() {
394 return MoreObjects.toStringHelper(getClass())
395 .add("nodeId", nodeId)
396 .add("sessionId", sessionId)
397 .toString();
398 }
399 }
400
401 private static class ElectionState {
402 final Registration leader;
403 final long term;
404 final long termStartTime;
405 final List<Registration> registrations;
406
407 public ElectionState(Registration registration, Supplier<Long> termCounter) {
408 registrations = Arrays.asList(registration);
409 term = termCounter.get();
410 termStartTime = System.currentTimeMillis();
411 leader = registration;
412 }
413
414 public ElectionState(ElectionState other) {
415 registrations = Lists.newArrayList(other.registrations);
416 leader = other.leader;
417 term = other.term;
418 termStartTime = other.termStartTime;
419 }
420
421 public ElectionState(List<Registration> registrations,
422 Registration leader,
423 long term,
424 long termStartTime) {
425 this.registrations = Lists.newArrayList(registrations);
426 this.leader = leader;
427 this.term = term;
428 this.termStartTime = termStartTime;
429 }
430
Madan Jampani3a9911c2016-02-21 11:25:45 -0800431 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800432 Optional<Registration> registration =
433 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
434 if (registration.isPresent()) {
435 List<Registration> updatedRegistrations =
436 registrations.stream()
437 .filter(r -> r.sessionId() != session.id())
438 .collect(Collectors.toList());
439 if (leader.sessionId() == session.id()) {
440 if (updatedRegistrations.size() > 0) {
441 return new ElectionState(updatedRegistrations,
442 updatedRegistrations.get(0),
443 termCounter.get(),
444 System.currentTimeMillis());
445 } else {
446 return new ElectionState(updatedRegistrations, null, term, termStartTime);
447 }
448 } else {
449 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
450 }
451 } else {
452 return this;
453 }
454 }
455
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800456 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
457 Optional<Registration> registration =
458 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
459 if (registration.isPresent()) {
460 List<Registration> updatedRegistrations =
461 registrations.stream()
462 .filter(r -> !r.nodeId().equals(nodeId))
463 .collect(Collectors.toList());
464 if (leader.nodeId().equals(nodeId)) {
465 if (updatedRegistrations.size() > 0) {
466 return new ElectionState(updatedRegistrations,
467 updatedRegistrations.get(0),
468 termCounter.get(),
469 System.currentTimeMillis());
470 } else {
471 return new ElectionState(updatedRegistrations, null, term, termStartTime);
472 }
473 } else {
474 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
475 }
476 } else {
477 return this;
478 }
479 }
480
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800481 public boolean isDuplicate(Registration registration) {
482 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
483 }
484
485 public Leader leader() {
486 if (leader == null) {
487 return null;
488 } else {
489 NodeId leaderNodeId = leader.nodeId();
490 return new Leader(leaderNodeId, term, termStartTime);
491 }
492 }
493
494 public List<NodeId> candidates() {
495 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
496 }
497
498 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
499 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
500 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
501 updatedRegistrations.add(registration);
502 boolean newLeader = leader == null;
503 return new ElectionState(updatedRegistrations,
504 newLeader ? registration : leader,
505 newLeader ? termCounter.get() : term,
506 newLeader ? System.currentTimeMillis() : termStartTime);
507 }
508 return this;
509 }
510
511 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
512 Registration newLeader = registrations.stream()
513 .filter(r -> r.nodeId().equals(nodeId))
514 .findFirst()
515 .orElse(null);
516 if (newLeader != null) {
517 return new ElectionState(registrations,
518 newLeader,
519 termCounter.incrementAndGet(),
520 System.currentTimeMillis());
521 } else {
522 return this;
523 }
524 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800525
526 public ElectionState promote(NodeId nodeId) {
527 Registration registration = registrations.stream()
528 .filter(r -> r.nodeId().equals(nodeId))
529 .findFirst()
530 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800531 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800532 updatedRegistrations.add(registration);
533 registrations.stream()
534 .filter(r -> !r.nodeId().equals(nodeId))
535 .forEach(updatedRegistrations::add);
536 return new ElectionState(updatedRegistrations,
537 leader,
538 term,
539 termStartTime);
540
541 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800542 }
543
544 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800545 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800546 }
547
548 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800549 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800550 onSessionEnd(session);
551 }
552
553 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800554 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800555 onSessionEnd(session);
556 }
557
558 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800559 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800560 onSessionEnd(session);
561 }
562
563 @Override
564 public void snapshot(SnapshotWriter writer) {
565 byte[] encodedTermCounters = serializer.encode(termCounters);
566 writer.writeInt(encodedTermCounters.length);
567 writer.write(encodedTermCounters);
568 byte[] encodedElections = serializer.encode(elections);
569 writer.writeInt(encodedElections.length);
570 writer.write(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700571 log.debug("Took state machine snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800572 }
573
574 @Override
575 public void install(SnapshotReader reader) {
576 int encodedTermCountersSize = reader.readInt();
577 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
578 reader.read(encodedTermCounters);
579 termCounters = serializer.decode(encodedTermCounters);
580 int encodedElectionsSize = reader.readInt();
581 byte[] encodedElections = new byte[encodedElectionsSize];
582 reader.read(encodedElections);
583 elections = serializer.decode(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700584 log.debug("Reinstated state machine from snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800585 }
586
587 private AtomicLong termCounter(String topic) {
588 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
589 }
590}