blob: df04e6d81e7fb2f952b0b3978cd2dab331194724 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3a9911c2016-02-21 11:25:45 -080019import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
27
28import java.util.Arrays;
29import java.util.HashMap;
30import java.util.LinkedHashMap;
31import java.util.LinkedList;
32import java.util.List;
33import java.util.Map;
34import java.util.Optional;
Madan Jampani65f24bb2016-03-15 15:16:18 -070035import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.function.Supplier;
39import java.util.stream.Collectors;
40
41import org.onosproject.cluster.Leader;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080046import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080051import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080052import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
53import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.Serializer;
57import org.slf4j.Logger;
58
59import com.google.common.base.MoreObjects;
60import com.google.common.base.Objects;
61import com.google.common.collect.Lists;
62import com.google.common.collect.Maps;
63
64/**
65 * State machine for {@link AtomixLeaderElector} resource.
66 */
67public class AtomixLeaderElectorState extends ResourceStateMachine
68 implements SessionListener, Snapshottable {
69
70 private final Logger log = getLogger(getClass());
71 private Map<String, AtomicLong> termCounters = new HashMap<>();
72 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080073 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
75 ElectionState.class,
76 Registration.class);
77
Madan Jampani65f24bb2016-03-15 15:16:18 -070078 public AtomixLeaderElectorState(Properties properties) {
79 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080080 }
81
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 @Override
83 protected void configure(StateMachineExecutor executor) {
84 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080085 executor.register(Listen.class, this::listen);
86 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080087 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080088 executor.register(Run.class, this::run);
89 executor.register(Withdraw.class, this::withdraw);
90 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080091 executor.register(Promote.class, this::promote);
92 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080094 executor.register(GetLeadership.class, this::leadership);
95 executor.register(GetAllLeaderships.class, this::allLeaderships);
96 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080097 }
98
99 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800100 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800101 }
102
103 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
104 if (changes.isEmpty()) {
105 return;
106 }
107 listeners.values()
108 .forEach(listener -> listener.session()
109 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 }
111
112 @Override
113 public void delete() {
114 // Close and clear Listeners
115 listeners.values().forEach(Commit::close);
116 listeners.clear();
117 }
118
119 /**
120 * Applies listen commits.
121 *
122 * @param commit listen commit
123 */
Madan Jampanifc981772016-02-16 09:46:42 -0800124 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
126 commit.close();
127 }
128 }
129
130 /**
131 * Applies unlisten commits.
132 *
133 * @param commit unlisten commit
134 */
Madan Jampanifc981772016-02-16 09:46:42 -0800135 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800137 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800138 if (listener != null) {
139 listener.close();
140 }
141 } finally {
142 commit.close();
143 }
144 }
145
146 /**
147 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
148 * @param commit commit entry
149 * @return topic leader. If no previous leader existed this is the node that just entered the race.
150 */
Madan Jampanifc981772016-02-16 09:46:42 -0800151 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 try {
153 String topic = commit.operation().topic();
154 Leadership oldLeadership = leadership(topic);
155 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
156 elections.compute(topic, (k, v) -> {
157 if (v == null) {
158 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
159 } else {
160 if (!v.isDuplicate(registration)) {
161 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
162 } else {
163 return v;
164 }
165 }
166 });
167 Leadership newLeadership = leadership(topic);
168
169 if (!Objects.equal(oldLeadership, newLeadership)) {
170 notifyLeadershipChange(oldLeadership, newLeadership);
171 }
172 return newLeadership;
173 } finally {
174 commit.close();
175 }
176 }
177
178 /**
179 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
180 * @param commit withdraw commit
181 */
Madan Jampanifc981772016-02-16 09:46:42 -0800182 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 try {
184 String topic = commit.operation().topic();
185 Leadership oldLeadership = leadership(topic);
186 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
187 termCounter(topic)::incrementAndGet));
188 Leadership newLeadership = leadership(topic);
189 if (!Objects.equal(oldLeadership, newLeadership)) {
190 notifyLeadershipChange(oldLeadership, newLeadership);
191 }
192 } finally {
193 commit.close();
194 }
195 }
196
197 /**
198 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
199 * @param commit anoint commit
200 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
201 */
Madan Jampanifc981772016-02-16 09:46:42 -0800202 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800203 try {
204 String topic = commit.operation().topic();
Madan Jampanic94b4852016-02-23 18:18:37 -0800205 NodeId nodeId = commit.operation().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800206 Leadership oldLeadership = leadership(topic);
207 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800208 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 Leadership newLeadership = leadership(topic);
210 if (!Objects.equal(oldLeadership, newLeadership)) {
211 notifyLeadershipChange(oldLeadership, newLeadership);
212 }
213 return (electionState != null &&
214 electionState.leader() != null &&
215 commit.operation().nodeId().equals(electionState.leader().nodeId()));
216 } finally {
217 commit.close();
218 }
219 }
220
221 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800222 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
223 * @param commit promote commit
224 * @return {@code true} if changes desired end state is achieved.
225 */
226 public boolean promote(Commit<? extends Promote> commit) {
227 try {
228 String topic = commit.operation().topic();
229 NodeId nodeId = commit.operation().nodeId();
230 Leadership oldLeadership = leadership(topic);
231 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
232 return false;
233 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800234 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800235 Leadership newLeadership = leadership(topic);
236 if (!Objects.equal(oldLeadership, newLeadership)) {
237 notifyLeadershipChange(oldLeadership, newLeadership);
238 }
239 return true;
240 } finally {
241 commit.close();
242 }
243 }
244
245 /**
246 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
247 * @param commit evict commit
248 */
249 public void evict(Commit<? extends Evict> commit) {
250 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800251 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800252 NodeId nodeId = commit.operation().nodeId();
253 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
254 topics.forEach(topic -> {
255 Leadership oldLeadership = leadership(topic);
256 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
257 Leadership newLeadership = leadership(topic);
258 if (!Objects.equal(oldLeadership, newLeadership)) {
259 changes.add(new Change<>(oldLeadership, newLeadership));
260 }
261 });
262 notifyLeadershipChanges(changes);
263 } finally {
264 commit.close();
265 }
266 }
267
268 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800269 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
270 * @param commit GetLeadership commit
271 * @return leader
272 */
Madan Jampanifc981772016-02-16 09:46:42 -0800273 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800274 String topic = commit.operation().topic();
275 try {
276 return leadership(topic);
277 } finally {
278 commit.close();
279 }
280 }
281
282 /**
283 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
284 * @param commit commit entry
285 * @return set of topics for which the node is the leader
286 */
Madan Jampanifc981772016-02-16 09:46:42 -0800287 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800288 try {
289 NodeId nodeId = commit.operation().nodeId();
290 return Maps.filterEntries(elections, e -> {
291 Leader leader = leadership(e.getKey()).leader();
292 return leader != null && leader.nodeId().equals(nodeId);
293 }).keySet();
294 } finally {
295 commit.close();
296 }
297 }
298
299 /**
300 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
301 * @param commit GetAllLeaderships commit
302 * @return topic to leader mapping
303 */
Madan Jampanifc981772016-02-16 09:46:42 -0800304 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800305 try {
306 return Maps.transformEntries(elections, (k, v) -> leadership(k));
307 } finally {
308 commit.close();
309 }
310 }
311
312 private Leadership leadership(String topic) {
313 return new Leadership(topic,
314 leader(topic),
315 candidates(topic));
316 }
317
318 private Leader leader(String topic) {
319 ElectionState electionState = elections.get(topic);
320 return electionState == null ? null : electionState.leader();
321 }
322
323 private List<NodeId> candidates(String topic) {
324 ElectionState electionState = elections.get(topic);
325 return electionState == null ? new LinkedList<>() : electionState.candidates();
326 }
327
Madan Jampani3a9911c2016-02-21 11:25:45 -0800328 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800329 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800330 if (listener != null) {
331 listener.close();
332 }
333 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800334 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800335 topics.forEach(topic -> {
336 Leadership oldLeadership = leadership(topic);
337 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
338 Leadership newLeadership = leadership(topic);
339 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800340 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800341 }
342 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800343 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800344 }
345
346 private static class Registration {
347 private final NodeId nodeId;
348 private final long sessionId;
349
350 public Registration(NodeId nodeId, long sessionId) {
351 this.nodeId = nodeId;
352 this.sessionId = sessionId;
353 }
354
355 public NodeId nodeId() {
356 return nodeId;
357 }
358
359 public long sessionId() {
360 return sessionId;
361 }
362
363 @Override
364 public String toString() {
365 return MoreObjects.toStringHelper(getClass())
366 .add("nodeId", nodeId)
367 .add("sessionId", sessionId)
368 .toString();
369 }
370 }
371
372 private static class ElectionState {
373 final Registration leader;
374 final long term;
375 final long termStartTime;
376 final List<Registration> registrations;
377
378 public ElectionState(Registration registration, Supplier<Long> termCounter) {
379 registrations = Arrays.asList(registration);
380 term = termCounter.get();
381 termStartTime = System.currentTimeMillis();
382 leader = registration;
383 }
384
385 public ElectionState(ElectionState other) {
386 registrations = Lists.newArrayList(other.registrations);
387 leader = other.leader;
388 term = other.term;
389 termStartTime = other.termStartTime;
390 }
391
392 public ElectionState(List<Registration> registrations,
393 Registration leader,
394 long term,
395 long termStartTime) {
396 this.registrations = Lists.newArrayList(registrations);
397 this.leader = leader;
398 this.term = term;
399 this.termStartTime = termStartTime;
400 }
401
Madan Jampani3a9911c2016-02-21 11:25:45 -0800402 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800403 Optional<Registration> registration =
404 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
405 if (registration.isPresent()) {
406 List<Registration> updatedRegistrations =
407 registrations.stream()
408 .filter(r -> r.sessionId() != session.id())
409 .collect(Collectors.toList());
410 if (leader.sessionId() == session.id()) {
411 if (updatedRegistrations.size() > 0) {
412 return new ElectionState(updatedRegistrations,
413 updatedRegistrations.get(0),
414 termCounter.get(),
415 System.currentTimeMillis());
416 } else {
417 return new ElectionState(updatedRegistrations, null, term, termStartTime);
418 }
419 } else {
420 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
421 }
422 } else {
423 return this;
424 }
425 }
426
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800427 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
428 Optional<Registration> registration =
429 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
430 if (registration.isPresent()) {
431 List<Registration> updatedRegistrations =
432 registrations.stream()
433 .filter(r -> !r.nodeId().equals(nodeId))
434 .collect(Collectors.toList());
435 if (leader.nodeId().equals(nodeId)) {
436 if (updatedRegistrations.size() > 0) {
437 return new ElectionState(updatedRegistrations,
438 updatedRegistrations.get(0),
439 termCounter.get(),
440 System.currentTimeMillis());
441 } else {
442 return new ElectionState(updatedRegistrations, null, term, termStartTime);
443 }
444 } else {
445 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
446 }
447 } else {
448 return this;
449 }
450 }
451
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800452 public boolean isDuplicate(Registration registration) {
453 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
454 }
455
456 public Leader leader() {
457 if (leader == null) {
458 return null;
459 } else {
460 NodeId leaderNodeId = leader.nodeId();
461 return new Leader(leaderNodeId, term, termStartTime);
462 }
463 }
464
465 public List<NodeId> candidates() {
466 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
467 }
468
469 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
470 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
471 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
472 updatedRegistrations.add(registration);
473 boolean newLeader = leader == null;
474 return new ElectionState(updatedRegistrations,
475 newLeader ? registration : leader,
476 newLeader ? termCounter.get() : term,
477 newLeader ? System.currentTimeMillis() : termStartTime);
478 }
479 return this;
480 }
481
482 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
483 Registration newLeader = registrations.stream()
484 .filter(r -> r.nodeId().equals(nodeId))
485 .findFirst()
486 .orElse(null);
487 if (newLeader != null) {
488 return new ElectionState(registrations,
489 newLeader,
490 termCounter.incrementAndGet(),
491 System.currentTimeMillis());
492 } else {
493 return this;
494 }
495 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800496
497 public ElectionState promote(NodeId nodeId) {
498 Registration registration = registrations.stream()
499 .filter(r -> r.nodeId().equals(nodeId))
500 .findFirst()
501 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800502 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800503 updatedRegistrations.add(registration);
504 registrations.stream()
505 .filter(r -> !r.nodeId().equals(nodeId))
506 .forEach(updatedRegistrations::add);
507 return new ElectionState(updatedRegistrations,
508 leader,
509 term,
510 termStartTime);
511
512 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800513 }
514
515 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800516 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800517 }
518
519 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800520 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800521 onSessionEnd(session);
522 }
523
524 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800525 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800526 onSessionEnd(session);
527 }
528
529 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800530 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800531 onSessionEnd(session);
532 }
533
534 @Override
535 public void snapshot(SnapshotWriter writer) {
536 byte[] encodedTermCounters = serializer.encode(termCounters);
537 writer.writeInt(encodedTermCounters.length);
538 writer.write(encodedTermCounters);
539 byte[] encodedElections = serializer.encode(elections);
540 writer.writeInt(encodedElections.length);
541 writer.write(encodedElections);
542 log.info("Took state machine snapshot");
543 }
544
545 @Override
546 public void install(SnapshotReader reader) {
547 int encodedTermCountersSize = reader.readInt();
548 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
549 reader.read(encodedTermCounters);
550 termCounters = serializer.decode(encodedTermCounters);
551 int encodedElectionsSize = reader.readInt();
552 byte[] encodedElections = new byte[encodedElectionsSize];
553 reader.read(encodedElections);
554 elections = serializer.decode(encodedElections);
555 log.info("Reinstated state machine from snapshot");
556 }
557
558 private AtomicLong termCounter(String topic) {
559 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
560 }
561}