blob: 894cc791cc11b32d427e5c43463cf4d5cbded37d [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3a9911c2016-02-21 11:25:45 -080019import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
27
28import java.util.Arrays;
29import java.util.HashMap;
30import java.util.LinkedHashMap;
31import java.util.LinkedList;
32import java.util.List;
33import java.util.Map;
34import java.util.Optional;
Madan Jampani65f24bb2016-03-15 15:16:18 -070035import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080036import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.function.Supplier;
39import java.util.stream.Collectors;
40
41import org.onosproject.cluster.Leader;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080046import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080051import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080052import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
53import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.Serializer;
57import org.slf4j.Logger;
58
59import com.google.common.base.MoreObjects;
60import com.google.common.base.Objects;
61import com.google.common.collect.Lists;
62import com.google.common.collect.Maps;
63
64/**
65 * State machine for {@link AtomixLeaderElector} resource.
66 */
67public class AtomixLeaderElectorState extends ResourceStateMachine
68 implements SessionListener, Snapshottable {
69
70 private final Logger log = getLogger(getClass());
71 private Map<String, AtomicLong> termCounters = new HashMap<>();
72 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080073 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
75 ElectionState.class,
76 Registration.class);
77
Madan Jampani65f24bb2016-03-15 15:16:18 -070078 public AtomixLeaderElectorState(Properties properties) {
79 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080080 }
81
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 @Override
83 protected void configure(StateMachineExecutor executor) {
84 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080085 executor.register(Listen.class, this::listen);
86 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080087 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080088 executor.register(Run.class, this::run);
89 executor.register(Withdraw.class, this::withdraw);
90 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080091 executor.register(Promote.class, this::promote);
92 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080094 executor.register(GetLeadership.class, this::leadership);
95 executor.register(GetAllLeaderships.class, this::allLeaderships);
96 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080097 }
98
99 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800100 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800101 }
102
103 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
104 if (changes.isEmpty()) {
105 return;
106 }
107 listeners.values()
108 .forEach(listener -> listener.session()
109 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 }
111
112 @Override
113 public void delete() {
114 // Close and clear Listeners
115 listeners.values().forEach(Commit::close);
116 listeners.clear();
117 }
118
119 /**
120 * Applies listen commits.
121 *
122 * @param commit listen commit
123 */
Madan Jampanifc981772016-02-16 09:46:42 -0800124 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
126 commit.close();
127 }
128 }
129
130 /**
131 * Applies unlisten commits.
132 *
133 * @param commit unlisten commit
134 */
Madan Jampanifc981772016-02-16 09:46:42 -0800135 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800137 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800138 if (listener != null) {
139 listener.close();
140 }
141 } finally {
142 commit.close();
143 }
144 }
145
146 /**
147 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
148 * @param commit commit entry
149 * @return topic leader. If no previous leader existed this is the node that just entered the race.
150 */
Madan Jampanifc981772016-02-16 09:46:42 -0800151 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 try {
153 String topic = commit.operation().topic();
154 Leadership oldLeadership = leadership(topic);
155 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
156 elections.compute(topic, (k, v) -> {
157 if (v == null) {
158 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
159 } else {
160 if (!v.isDuplicate(registration)) {
161 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
162 } else {
163 return v;
164 }
165 }
166 });
167 Leadership newLeadership = leadership(topic);
168
169 if (!Objects.equal(oldLeadership, newLeadership)) {
170 notifyLeadershipChange(oldLeadership, newLeadership);
171 }
172 return newLeadership;
173 } finally {
174 commit.close();
175 }
176 }
177
178 /**
179 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
180 * @param commit withdraw commit
181 */
Madan Jampanifc981772016-02-16 09:46:42 -0800182 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 try {
184 String topic = commit.operation().topic();
185 Leadership oldLeadership = leadership(topic);
186 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
187 termCounter(topic)::incrementAndGet));
188 Leadership newLeadership = leadership(topic);
189 if (!Objects.equal(oldLeadership, newLeadership)) {
190 notifyLeadershipChange(oldLeadership, newLeadership);
191 }
192 } finally {
193 commit.close();
194 }
195 }
196
197 /**
198 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
199 * @param commit anoint commit
200 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
201 */
Madan Jampanifc981772016-02-16 09:46:42 -0800202 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800203 try {
204 String topic = commit.operation().topic();
Madan Jampanic94b4852016-02-23 18:18:37 -0800205 NodeId nodeId = commit.operation().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800206 Leadership oldLeadership = leadership(topic);
207 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800208 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 Leadership newLeadership = leadership(topic);
210 if (!Objects.equal(oldLeadership, newLeadership)) {
211 notifyLeadershipChange(oldLeadership, newLeadership);
212 }
213 return (electionState != null &&
214 electionState.leader() != null &&
215 commit.operation().nodeId().equals(electionState.leader().nodeId()));
216 } finally {
217 commit.close();
218 }
219 }
220
221 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800222 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
223 * @param commit promote commit
224 * @return {@code true} if changes desired end state is achieved.
225 */
226 public boolean promote(Commit<? extends Promote> commit) {
227 try {
228 String topic = commit.operation().topic();
229 NodeId nodeId = commit.operation().nodeId();
230 Leadership oldLeadership = leadership(topic);
231 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
232 return false;
233 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800234 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800235 Leadership newLeadership = leadership(topic);
236 if (!Objects.equal(oldLeadership, newLeadership)) {
237 notifyLeadershipChange(oldLeadership, newLeadership);
238 }
239 return true;
240 } finally {
241 commit.close();
242 }
243 }
244
245 /**
246 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
247 * @param commit evict commit
248 */
249 public void evict(Commit<? extends Evict> commit) {
250 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800251 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800252 NodeId nodeId = commit.operation().nodeId();
253 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
254 topics.forEach(topic -> {
255 Leadership oldLeadership = leadership(topic);
256 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
257 Leadership newLeadership = leadership(topic);
258 if (!Objects.equal(oldLeadership, newLeadership)) {
259 changes.add(new Change<>(oldLeadership, newLeadership));
260 }
261 });
262 notifyLeadershipChanges(changes);
263 } finally {
264 commit.close();
265 }
266 }
267
268 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800269 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
270 * @param commit GetLeadership commit
271 * @return leader
272 */
Madan Jampanifc981772016-02-16 09:46:42 -0800273 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800274 String topic = commit.operation().topic();
275 try {
276 return leadership(topic);
277 } finally {
278 commit.close();
279 }
280 }
281
282 /**
283 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
284 * @param commit commit entry
285 * @return set of topics for which the node is the leader
286 */
Madan Jampanifc981772016-02-16 09:46:42 -0800287 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800288 try {
289 NodeId nodeId = commit.operation().nodeId();
290 return Maps.filterEntries(elections, e -> {
291 Leader leader = leadership(e.getKey()).leader();
292 return leader != null && leader.nodeId().equals(nodeId);
293 }).keySet();
294 } finally {
295 commit.close();
296 }
297 }
298
299 /**
300 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
301 * @param commit GetAllLeaderships commit
302 * @return topic to leader mapping
303 */
Madan Jampanifc981772016-02-16 09:46:42 -0800304 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700305 Map<String, Leadership> result = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800306 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700307 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
308 return result;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800309 } finally {
310 commit.close();
311 }
312 }
313
314 private Leadership leadership(String topic) {
315 return new Leadership(topic,
316 leader(topic),
317 candidates(topic));
318 }
319
320 private Leader leader(String topic) {
321 ElectionState electionState = elections.get(topic);
322 return electionState == null ? null : electionState.leader();
323 }
324
325 private List<NodeId> candidates(String topic) {
326 ElectionState electionState = elections.get(topic);
327 return electionState == null ? new LinkedList<>() : electionState.candidates();
328 }
329
Madan Jampani3a9911c2016-02-21 11:25:45 -0800330 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800331 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800332 if (listener != null) {
333 listener.close();
334 }
335 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800336 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800337 topics.forEach(topic -> {
338 Leadership oldLeadership = leadership(topic);
339 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
340 Leadership newLeadership = leadership(topic);
341 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800342 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800343 }
344 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800345 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800346 }
347
348 private static class Registration {
349 private final NodeId nodeId;
350 private final long sessionId;
351
352 public Registration(NodeId nodeId, long sessionId) {
353 this.nodeId = nodeId;
354 this.sessionId = sessionId;
355 }
356
357 public NodeId nodeId() {
358 return nodeId;
359 }
360
361 public long sessionId() {
362 return sessionId;
363 }
364
365 @Override
366 public String toString() {
367 return MoreObjects.toStringHelper(getClass())
368 .add("nodeId", nodeId)
369 .add("sessionId", sessionId)
370 .toString();
371 }
372 }
373
374 private static class ElectionState {
375 final Registration leader;
376 final long term;
377 final long termStartTime;
378 final List<Registration> registrations;
379
380 public ElectionState(Registration registration, Supplier<Long> termCounter) {
381 registrations = Arrays.asList(registration);
382 term = termCounter.get();
383 termStartTime = System.currentTimeMillis();
384 leader = registration;
385 }
386
387 public ElectionState(ElectionState other) {
388 registrations = Lists.newArrayList(other.registrations);
389 leader = other.leader;
390 term = other.term;
391 termStartTime = other.termStartTime;
392 }
393
394 public ElectionState(List<Registration> registrations,
395 Registration leader,
396 long term,
397 long termStartTime) {
398 this.registrations = Lists.newArrayList(registrations);
399 this.leader = leader;
400 this.term = term;
401 this.termStartTime = termStartTime;
402 }
403
Madan Jampani3a9911c2016-02-21 11:25:45 -0800404 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800405 Optional<Registration> registration =
406 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
407 if (registration.isPresent()) {
408 List<Registration> updatedRegistrations =
409 registrations.stream()
410 .filter(r -> r.sessionId() != session.id())
411 .collect(Collectors.toList());
412 if (leader.sessionId() == session.id()) {
413 if (updatedRegistrations.size() > 0) {
414 return new ElectionState(updatedRegistrations,
415 updatedRegistrations.get(0),
416 termCounter.get(),
417 System.currentTimeMillis());
418 } else {
419 return new ElectionState(updatedRegistrations, null, term, termStartTime);
420 }
421 } else {
422 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
423 }
424 } else {
425 return this;
426 }
427 }
428
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800429 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
430 Optional<Registration> registration =
431 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
432 if (registration.isPresent()) {
433 List<Registration> updatedRegistrations =
434 registrations.stream()
435 .filter(r -> !r.nodeId().equals(nodeId))
436 .collect(Collectors.toList());
437 if (leader.nodeId().equals(nodeId)) {
438 if (updatedRegistrations.size() > 0) {
439 return new ElectionState(updatedRegistrations,
440 updatedRegistrations.get(0),
441 termCounter.get(),
442 System.currentTimeMillis());
443 } else {
444 return new ElectionState(updatedRegistrations, null, term, termStartTime);
445 }
446 } else {
447 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
448 }
449 } else {
450 return this;
451 }
452 }
453
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800454 public boolean isDuplicate(Registration registration) {
455 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
456 }
457
458 public Leader leader() {
459 if (leader == null) {
460 return null;
461 } else {
462 NodeId leaderNodeId = leader.nodeId();
463 return new Leader(leaderNodeId, term, termStartTime);
464 }
465 }
466
467 public List<NodeId> candidates() {
468 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
469 }
470
471 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
472 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
473 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
474 updatedRegistrations.add(registration);
475 boolean newLeader = leader == null;
476 return new ElectionState(updatedRegistrations,
477 newLeader ? registration : leader,
478 newLeader ? termCounter.get() : term,
479 newLeader ? System.currentTimeMillis() : termStartTime);
480 }
481 return this;
482 }
483
484 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
485 Registration newLeader = registrations.stream()
486 .filter(r -> r.nodeId().equals(nodeId))
487 .findFirst()
488 .orElse(null);
489 if (newLeader != null) {
490 return new ElectionState(registrations,
491 newLeader,
492 termCounter.incrementAndGet(),
493 System.currentTimeMillis());
494 } else {
495 return this;
496 }
497 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800498
499 public ElectionState promote(NodeId nodeId) {
500 Registration registration = registrations.stream()
501 .filter(r -> r.nodeId().equals(nodeId))
502 .findFirst()
503 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800504 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800505 updatedRegistrations.add(registration);
506 registrations.stream()
507 .filter(r -> !r.nodeId().equals(nodeId))
508 .forEach(updatedRegistrations::add);
509 return new ElectionState(updatedRegistrations,
510 leader,
511 term,
512 termStartTime);
513
514 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800515 }
516
517 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800518 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800519 }
520
521 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800522 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800523 onSessionEnd(session);
524 }
525
526 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800527 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800528 onSessionEnd(session);
529 }
530
531 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800532 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800533 onSessionEnd(session);
534 }
535
536 @Override
537 public void snapshot(SnapshotWriter writer) {
538 byte[] encodedTermCounters = serializer.encode(termCounters);
539 writer.writeInt(encodedTermCounters.length);
540 writer.write(encodedTermCounters);
541 byte[] encodedElections = serializer.encode(elections);
542 writer.writeInt(encodedElections.length);
543 writer.write(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700544 log.debug("Took state machine snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800545 }
546
547 @Override
548 public void install(SnapshotReader reader) {
549 int encodedTermCountersSize = reader.readInt();
550 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
551 reader.read(encodedTermCounters);
552 termCounters = serializer.decode(encodedTermCounters);
553 int encodedElectionsSize = reader.readInt();
554 byte[] encodedElections = new byte[encodedElectionsSize];
555 reader.read(encodedElections);
556 elections = serializer.decode(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700557 log.debug("Reinstated state machine from snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800558 }
559
560 private AtomicLong termCounter(String topic) {
561 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
562 }
563}