blob: 628167296b4e402a0e4cdaa47b96e2c7196c1290 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -070019
20import com.google.common.collect.ImmutableSet;
Madan Jampani3a9911c2016-02-21 11:25:45 -080021import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080022import io.atomix.copycat.server.Commit;
23import io.atomix.copycat.server.Snapshottable;
24import io.atomix.copycat.server.StateMachineExecutor;
25import io.atomix.copycat.server.session.SessionListener;
26import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
27import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
28import io.atomix.resource.ResourceStateMachine;
29
30import java.util.Arrays;
31import java.util.HashMap;
32import java.util.LinkedHashMap;
33import java.util.LinkedList;
34import java.util.List;
35import java.util.Map;
36import java.util.Optional;
Madan Jampani65f24bb2016-03-15 15:16:18 -070037import java.util.Properties;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080038import java.util.Set;
39import java.util.concurrent.atomic.AtomicLong;
40import java.util.function.Supplier;
41import java.util.stream.Collectors;
42
43import org.onosproject.cluster.Leader;
44import org.onosproject.cluster.Leadership;
45import org.onosproject.cluster.NodeId;
46import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080048import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080049import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
52import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080053import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080054import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
55import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
56import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080057import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.Serializer;
59import org.slf4j.Logger;
60
61import com.google.common.base.MoreObjects;
62import com.google.common.base.Objects;
63import com.google.common.collect.Lists;
64import com.google.common.collect.Maps;
65
66/**
67 * State machine for {@link AtomixLeaderElector} resource.
68 */
69public class AtomixLeaderElectorState extends ResourceStateMachine
70 implements SessionListener, Snapshottable {
71
72 private final Logger log = getLogger(getClass());
73 private Map<String, AtomicLong> termCounters = new HashMap<>();
74 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080075 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080076 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
77 ElectionState.class,
78 Registration.class);
79
Madan Jampani65f24bb2016-03-15 15:16:18 -070080 public AtomixLeaderElectorState(Properties properties) {
81 super(properties);
Madan Jampani86cb2432016-02-17 11:07:56 -080082 }
83
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 @Override
85 protected void configure(StateMachineExecutor executor) {
86 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080087 executor.register(Listen.class, this::listen);
88 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080090 executor.register(Run.class, this::run);
91 executor.register(Withdraw.class, this::withdraw);
92 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080093 executor.register(Promote.class, this::promote);
94 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080095 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080096 executor.register(GetLeadership.class, this::leadership);
97 executor.register(GetAllLeaderships.class, this::allLeaderships);
98 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080099 }
100
101 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800102 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800103 }
104
105 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
106 if (changes.isEmpty()) {
107 return;
108 }
109 listeners.values()
110 .forEach(listener -> listener.session()
111 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800112 }
113
114 @Override
115 public void delete() {
116 // Close and clear Listeners
117 listeners.values().forEach(Commit::close);
118 listeners.clear();
119 }
120
121 /**
122 * Applies listen commits.
123 *
124 * @param commit listen commit
125 */
Madan Jampanifc981772016-02-16 09:46:42 -0800126 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800127 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
128 commit.close();
129 }
130 }
131
132 /**
133 * Applies unlisten commits.
134 *
135 * @param commit unlisten commit
136 */
Madan Jampanifc981772016-02-16 09:46:42 -0800137 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800138 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800139 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 if (listener != null) {
141 listener.close();
142 }
143 } finally {
144 commit.close();
145 }
146 }
147
148 /**
149 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
150 * @param commit commit entry
151 * @return topic leader. If no previous leader existed this is the node that just entered the race.
152 */
Madan Jampanifc981772016-02-16 09:46:42 -0800153 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800154 try {
155 String topic = commit.operation().topic();
156 Leadership oldLeadership = leadership(topic);
157 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
158 elections.compute(topic, (k, v) -> {
159 if (v == null) {
160 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
161 } else {
162 if (!v.isDuplicate(registration)) {
163 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
164 } else {
165 return v;
166 }
167 }
168 });
169 Leadership newLeadership = leadership(topic);
170
171 if (!Objects.equal(oldLeadership, newLeadership)) {
172 notifyLeadershipChange(oldLeadership, newLeadership);
173 }
174 return newLeadership;
175 } finally {
176 commit.close();
177 }
178 }
179
180 /**
181 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
182 * @param commit withdraw commit
183 */
Madan Jampanifc981772016-02-16 09:46:42 -0800184 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800185 try {
186 String topic = commit.operation().topic();
187 Leadership oldLeadership = leadership(topic);
188 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
189 termCounter(topic)::incrementAndGet));
190 Leadership newLeadership = leadership(topic);
191 if (!Objects.equal(oldLeadership, newLeadership)) {
192 notifyLeadershipChange(oldLeadership, newLeadership);
193 }
194 } finally {
195 commit.close();
196 }
197 }
198
199 /**
200 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
201 * @param commit anoint commit
202 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
203 */
Madan Jampanifc981772016-02-16 09:46:42 -0800204 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800205 try {
206 String topic = commit.operation().topic();
Madan Jampanic94b4852016-02-23 18:18:37 -0800207 NodeId nodeId = commit.operation().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800208 Leadership oldLeadership = leadership(topic);
209 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800210 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800211 Leadership newLeadership = leadership(topic);
212 if (!Objects.equal(oldLeadership, newLeadership)) {
213 notifyLeadershipChange(oldLeadership, newLeadership);
214 }
215 return (electionState != null &&
216 electionState.leader() != null &&
217 commit.operation().nodeId().equals(electionState.leader().nodeId()));
218 } finally {
219 commit.close();
220 }
221 }
222
223 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800224 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
225 * @param commit promote commit
226 * @return {@code true} if changes desired end state is achieved.
227 */
228 public boolean promote(Commit<? extends Promote> commit) {
229 try {
230 String topic = commit.operation().topic();
231 NodeId nodeId = commit.operation().nodeId();
232 Leadership oldLeadership = leadership(topic);
233 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
234 return false;
235 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800236 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800237 Leadership newLeadership = leadership(topic);
238 if (!Objects.equal(oldLeadership, newLeadership)) {
239 notifyLeadershipChange(oldLeadership, newLeadership);
240 }
241 return true;
242 } finally {
243 commit.close();
244 }
245 }
246
247 /**
248 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
249 * @param commit evict commit
250 */
251 public void evict(Commit<? extends Evict> commit) {
252 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800253 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800254 NodeId nodeId = commit.operation().nodeId();
255 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
256 topics.forEach(topic -> {
257 Leadership oldLeadership = leadership(topic);
258 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
259 Leadership newLeadership = leadership(topic);
260 if (!Objects.equal(oldLeadership, newLeadership)) {
261 changes.add(new Change<>(oldLeadership, newLeadership));
262 }
263 });
264 notifyLeadershipChanges(changes);
265 } finally {
266 commit.close();
267 }
268 }
269
270 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800271 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
272 * @param commit GetLeadership commit
273 * @return leader
274 */
Madan Jampanifc981772016-02-16 09:46:42 -0800275 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800276 String topic = commit.operation().topic();
277 try {
278 return leadership(topic);
279 } finally {
280 commit.close();
281 }
282 }
283
284 /**
285 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
286 * @param commit commit entry
287 * @return set of topics for which the node is the leader
288 */
Madan Jampanifc981772016-02-16 09:46:42 -0800289 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800290 try {
291 NodeId nodeId = commit.operation().nodeId();
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700292 return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800293 Leader leader = leadership(e.getKey()).leader();
294 return leader != null && leader.nodeId().equals(nodeId);
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700295 }).keySet());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800296 } finally {
297 commit.close();
298 }
299 }
300
301 /**
302 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
303 * @param commit GetAllLeaderships commit
304 * @return topic to leader mapping
305 */
Madan Jampanifc981772016-02-16 09:46:42 -0800306 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700307 Map<String, Leadership> result = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800308 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700309 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
310 return result;
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800311 } finally {
312 commit.close();
313 }
314 }
315
316 private Leadership leadership(String topic) {
317 return new Leadership(topic,
318 leader(topic),
319 candidates(topic));
320 }
321
322 private Leader leader(String topic) {
323 ElectionState electionState = elections.get(topic);
324 return electionState == null ? null : electionState.leader();
325 }
326
327 private List<NodeId> candidates(String topic) {
328 ElectionState electionState = elections.get(topic);
329 return electionState == null ? new LinkedList<>() : electionState.candidates();
330 }
331
Madan Jampani3a9911c2016-02-21 11:25:45 -0800332 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800333 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800334 if (listener != null) {
335 listener.close();
336 }
337 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800338 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800339 topics.forEach(topic -> {
340 Leadership oldLeadership = leadership(topic);
341 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
342 Leadership newLeadership = leadership(topic);
343 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800344 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800345 }
346 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800347 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800348 }
349
350 private static class Registration {
351 private final NodeId nodeId;
352 private final long sessionId;
353
354 public Registration(NodeId nodeId, long sessionId) {
355 this.nodeId = nodeId;
356 this.sessionId = sessionId;
357 }
358
359 public NodeId nodeId() {
360 return nodeId;
361 }
362
363 public long sessionId() {
364 return sessionId;
365 }
366
367 @Override
368 public String toString() {
369 return MoreObjects.toStringHelper(getClass())
370 .add("nodeId", nodeId)
371 .add("sessionId", sessionId)
372 .toString();
373 }
374 }
375
376 private static class ElectionState {
377 final Registration leader;
378 final long term;
379 final long termStartTime;
380 final List<Registration> registrations;
381
382 public ElectionState(Registration registration, Supplier<Long> termCounter) {
383 registrations = Arrays.asList(registration);
384 term = termCounter.get();
385 termStartTime = System.currentTimeMillis();
386 leader = registration;
387 }
388
389 public ElectionState(ElectionState other) {
390 registrations = Lists.newArrayList(other.registrations);
391 leader = other.leader;
392 term = other.term;
393 termStartTime = other.termStartTime;
394 }
395
396 public ElectionState(List<Registration> registrations,
397 Registration leader,
398 long term,
399 long termStartTime) {
400 this.registrations = Lists.newArrayList(registrations);
401 this.leader = leader;
402 this.term = term;
403 this.termStartTime = termStartTime;
404 }
405
Madan Jampani3a9911c2016-02-21 11:25:45 -0800406 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800407 Optional<Registration> registration =
408 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
409 if (registration.isPresent()) {
410 List<Registration> updatedRegistrations =
411 registrations.stream()
412 .filter(r -> r.sessionId() != session.id())
413 .collect(Collectors.toList());
414 if (leader.sessionId() == session.id()) {
415 if (updatedRegistrations.size() > 0) {
416 return new ElectionState(updatedRegistrations,
417 updatedRegistrations.get(0),
418 termCounter.get(),
419 System.currentTimeMillis());
420 } else {
421 return new ElectionState(updatedRegistrations, null, term, termStartTime);
422 }
423 } else {
424 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
425 }
426 } else {
427 return this;
428 }
429 }
430
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800431 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
432 Optional<Registration> registration =
433 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
434 if (registration.isPresent()) {
435 List<Registration> updatedRegistrations =
436 registrations.stream()
437 .filter(r -> !r.nodeId().equals(nodeId))
438 .collect(Collectors.toList());
439 if (leader.nodeId().equals(nodeId)) {
440 if (updatedRegistrations.size() > 0) {
441 return new ElectionState(updatedRegistrations,
442 updatedRegistrations.get(0),
443 termCounter.get(),
444 System.currentTimeMillis());
445 } else {
446 return new ElectionState(updatedRegistrations, null, term, termStartTime);
447 }
448 } else {
449 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
450 }
451 } else {
452 return this;
453 }
454 }
455
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800456 public boolean isDuplicate(Registration registration) {
457 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
458 }
459
460 public Leader leader() {
461 if (leader == null) {
462 return null;
463 } else {
464 NodeId leaderNodeId = leader.nodeId();
465 return new Leader(leaderNodeId, term, termStartTime);
466 }
467 }
468
469 public List<NodeId> candidates() {
470 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
471 }
472
473 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
474 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
475 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
476 updatedRegistrations.add(registration);
477 boolean newLeader = leader == null;
478 return new ElectionState(updatedRegistrations,
479 newLeader ? registration : leader,
480 newLeader ? termCounter.get() : term,
481 newLeader ? System.currentTimeMillis() : termStartTime);
482 }
483 return this;
484 }
485
486 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
487 Registration newLeader = registrations.stream()
488 .filter(r -> r.nodeId().equals(nodeId))
489 .findFirst()
490 .orElse(null);
491 if (newLeader != null) {
492 return new ElectionState(registrations,
493 newLeader,
494 termCounter.incrementAndGet(),
495 System.currentTimeMillis());
496 } else {
497 return this;
498 }
499 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800500
501 public ElectionState promote(NodeId nodeId) {
502 Registration registration = registrations.stream()
503 .filter(r -> r.nodeId().equals(nodeId))
504 .findFirst()
505 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800506 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800507 updatedRegistrations.add(registration);
508 registrations.stream()
509 .filter(r -> !r.nodeId().equals(nodeId))
510 .forEach(updatedRegistrations::add);
511 return new ElectionState(updatedRegistrations,
512 leader,
513 term,
514 termStartTime);
515
516 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800517 }
518
519 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800520 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800521 }
522
523 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800524 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800525 onSessionEnd(session);
526 }
527
528 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800529 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800530 onSessionEnd(session);
531 }
532
533 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800534 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800535 onSessionEnd(session);
536 }
537
538 @Override
539 public void snapshot(SnapshotWriter writer) {
540 byte[] encodedTermCounters = serializer.encode(termCounters);
541 writer.writeInt(encodedTermCounters.length);
542 writer.write(encodedTermCounters);
543 byte[] encodedElections = serializer.encode(elections);
544 writer.writeInt(encodedElections.length);
545 writer.write(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700546 log.debug("Took state machine snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800547 }
548
549 @Override
550 public void install(SnapshotReader reader) {
551 int encodedTermCountersSize = reader.readInt();
552 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
553 reader.read(encodedTermCounters);
554 termCounters = serializer.decode(encodedTermCounters);
555 int encodedElectionsSize = reader.readInt();
556 byte[] encodedElections = new byte[encodedElectionsSize];
557 reader.read(encodedElections);
558 elections = serializer.decode(encodedElections);
Madan Jampani630e7ac2016-05-31 11:34:05 -0700559 log.debug("Reinstated state machine from snapshot");
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800560 }
561
562 private AtomicLong termCounter(String topic) {
563 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
564 }
565}