blob: 0621042327c1cf1411096a714481ddb75c98da65 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3a9911c2016-02-21 11:25:45 -080019import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
Madan Jampani86cb2432016-02-17 11:07:56 -080027import io.atomix.resource.ResourceType;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080028
29import java.util.Arrays;
30import java.util.HashMap;
31import java.util.LinkedHashMap;
32import java.util.LinkedList;
33import java.util.List;
34import java.util.Map;
35import java.util.Optional;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.function.Supplier;
39import java.util.stream.Collectors;
40
41import org.onosproject.cluster.Leader;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080046import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080051import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080052import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
53import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.Serializer;
57import org.slf4j.Logger;
58
59import com.google.common.base.MoreObjects;
60import com.google.common.base.Objects;
61import com.google.common.collect.Lists;
62import com.google.common.collect.Maps;
63
64/**
65 * State machine for {@link AtomixLeaderElector} resource.
66 */
67public class AtomixLeaderElectorState extends ResourceStateMachine
68 implements SessionListener, Snapshottable {
69
70 private final Logger log = getLogger(getClass());
71 private Map<String, AtomicLong> termCounters = new HashMap<>();
72 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080073 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
75 ElectionState.class,
76 Registration.class);
77
Madan Jampani86cb2432016-02-17 11:07:56 -080078 public AtomixLeaderElectorState() {
79 super(new ResourceType(AtomixLeaderElector.class));
80 }
81
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 @Override
83 protected void configure(StateMachineExecutor executor) {
84 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080085 executor.register(Listen.class, this::listen);
86 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080087 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080088 executor.register(Run.class, this::run);
89 executor.register(Withdraw.class, this::withdraw);
90 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080091 executor.register(Promote.class, this::promote);
92 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080094 executor.register(GetLeadership.class, this::leadership);
95 executor.register(GetAllLeaderships.class, this::allLeaderships);
96 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080097 }
98
99 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800100 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800101 }
102
103 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
104 if (changes.isEmpty()) {
105 return;
106 }
107 listeners.values()
108 .forEach(listener -> listener.session()
109 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 }
111
112 @Override
113 public void delete() {
114 // Close and clear Listeners
115 listeners.values().forEach(Commit::close);
116 listeners.clear();
117 }
118
119 /**
120 * Applies listen commits.
121 *
122 * @param commit listen commit
123 */
Madan Jampanifc981772016-02-16 09:46:42 -0800124 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
126 commit.close();
127 }
128 }
129
130 /**
131 * Applies unlisten commits.
132 *
133 * @param commit unlisten commit
134 */
Madan Jampanifc981772016-02-16 09:46:42 -0800135 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800137 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800138 if (listener != null) {
139 listener.close();
140 }
141 } finally {
142 commit.close();
143 }
144 }
145
146 /**
147 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
148 * @param commit commit entry
149 * @return topic leader. If no previous leader existed this is the node that just entered the race.
150 */
Madan Jampanifc981772016-02-16 09:46:42 -0800151 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 try {
153 String topic = commit.operation().topic();
154 Leadership oldLeadership = leadership(topic);
155 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
156 elections.compute(topic, (k, v) -> {
157 if (v == null) {
158 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
159 } else {
160 if (!v.isDuplicate(registration)) {
161 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
162 } else {
163 return v;
164 }
165 }
166 });
167 Leadership newLeadership = leadership(topic);
168
169 if (!Objects.equal(oldLeadership, newLeadership)) {
170 notifyLeadershipChange(oldLeadership, newLeadership);
171 }
172 return newLeadership;
173 } finally {
174 commit.close();
175 }
176 }
177
178 /**
179 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
180 * @param commit withdraw commit
181 */
Madan Jampanifc981772016-02-16 09:46:42 -0800182 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 try {
184 String topic = commit.operation().topic();
185 Leadership oldLeadership = leadership(topic);
186 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
187 termCounter(topic)::incrementAndGet));
188 Leadership newLeadership = leadership(topic);
189 if (!Objects.equal(oldLeadership, newLeadership)) {
190 notifyLeadershipChange(oldLeadership, newLeadership);
191 }
192 } finally {
193 commit.close();
194 }
195 }
196
197 /**
198 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
199 * @param commit anoint commit
200 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
201 */
Madan Jampanifc981772016-02-16 09:46:42 -0800202 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800203 try {
204 String topic = commit.operation().topic();
205 Leadership oldLeadership = leadership(topic);
206 ElectionState electionState = elections.computeIfPresent(topic,
207 (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
208 Leadership newLeadership = leadership(topic);
209 if (!Objects.equal(oldLeadership, newLeadership)) {
210 notifyLeadershipChange(oldLeadership, newLeadership);
211 }
212 return (electionState != null &&
213 electionState.leader() != null &&
214 commit.operation().nodeId().equals(electionState.leader().nodeId()));
215 } finally {
216 commit.close();
217 }
218 }
219
220 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800221 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
222 * @param commit promote commit
223 * @return {@code true} if changes desired end state is achieved.
224 */
225 public boolean promote(Commit<? extends Promote> commit) {
226 try {
227 String topic = commit.operation().topic();
228 NodeId nodeId = commit.operation().nodeId();
229 Leadership oldLeadership = leadership(topic);
230 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
231 return false;
232 }
233 elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
234 Leadership newLeadership = leadership(topic);
235 if (!Objects.equal(oldLeadership, newLeadership)) {
236 notifyLeadershipChange(oldLeadership, newLeadership);
237 }
238 return true;
239 } finally {
240 commit.close();
241 }
242 }
243
244 /**
245 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
246 * @param commit evict commit
247 */
248 public void evict(Commit<? extends Evict> commit) {
249 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800250 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800251 NodeId nodeId = commit.operation().nodeId();
252 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
253 topics.forEach(topic -> {
254 Leadership oldLeadership = leadership(topic);
255 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
256 Leadership newLeadership = leadership(topic);
257 if (!Objects.equal(oldLeadership, newLeadership)) {
258 changes.add(new Change<>(oldLeadership, newLeadership));
259 }
260 });
261 notifyLeadershipChanges(changes);
262 } finally {
263 commit.close();
264 }
265 }
266
267 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800268 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
269 * @param commit GetLeadership commit
270 * @return leader
271 */
Madan Jampanifc981772016-02-16 09:46:42 -0800272 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800273 String topic = commit.operation().topic();
274 try {
275 return leadership(topic);
276 } finally {
277 commit.close();
278 }
279 }
280
281 /**
282 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
283 * @param commit commit entry
284 * @return set of topics for which the node is the leader
285 */
Madan Jampanifc981772016-02-16 09:46:42 -0800286 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800287 try {
288 NodeId nodeId = commit.operation().nodeId();
289 return Maps.filterEntries(elections, e -> {
290 Leader leader = leadership(e.getKey()).leader();
291 return leader != null && leader.nodeId().equals(nodeId);
292 }).keySet();
293 } finally {
294 commit.close();
295 }
296 }
297
298 /**
299 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
300 * @param commit GetAllLeaderships commit
301 * @return topic to leader mapping
302 */
Madan Jampanifc981772016-02-16 09:46:42 -0800303 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800304 try {
305 return Maps.transformEntries(elections, (k, v) -> leadership(k));
306 } finally {
307 commit.close();
308 }
309 }
310
311 private Leadership leadership(String topic) {
312 return new Leadership(topic,
313 leader(topic),
314 candidates(topic));
315 }
316
317 private Leader leader(String topic) {
318 ElectionState electionState = elections.get(topic);
319 return electionState == null ? null : electionState.leader();
320 }
321
322 private List<NodeId> candidates(String topic) {
323 ElectionState electionState = elections.get(topic);
324 return electionState == null ? new LinkedList<>() : electionState.candidates();
325 }
326
Madan Jampani3a9911c2016-02-21 11:25:45 -0800327 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800328 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800329 if (listener != null) {
330 listener.close();
331 }
332 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800333 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800334 topics.forEach(topic -> {
335 Leadership oldLeadership = leadership(topic);
336 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
337 Leadership newLeadership = leadership(topic);
338 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800339 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800340 }
341 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800342 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800343 }
344
345 private static class Registration {
346 private final NodeId nodeId;
347 private final long sessionId;
348
349 public Registration(NodeId nodeId, long sessionId) {
350 this.nodeId = nodeId;
351 this.sessionId = sessionId;
352 }
353
354 public NodeId nodeId() {
355 return nodeId;
356 }
357
358 public long sessionId() {
359 return sessionId;
360 }
361
362 @Override
363 public String toString() {
364 return MoreObjects.toStringHelper(getClass())
365 .add("nodeId", nodeId)
366 .add("sessionId", sessionId)
367 .toString();
368 }
369 }
370
371 private static class ElectionState {
372 final Registration leader;
373 final long term;
374 final long termStartTime;
375 final List<Registration> registrations;
376
377 public ElectionState(Registration registration, Supplier<Long> termCounter) {
378 registrations = Arrays.asList(registration);
379 term = termCounter.get();
380 termStartTime = System.currentTimeMillis();
381 leader = registration;
382 }
383
384 public ElectionState(ElectionState other) {
385 registrations = Lists.newArrayList(other.registrations);
386 leader = other.leader;
387 term = other.term;
388 termStartTime = other.termStartTime;
389 }
390
391 public ElectionState(List<Registration> registrations,
392 Registration leader,
393 long term,
394 long termStartTime) {
395 this.registrations = Lists.newArrayList(registrations);
396 this.leader = leader;
397 this.term = term;
398 this.termStartTime = termStartTime;
399 }
400
Madan Jampani3a9911c2016-02-21 11:25:45 -0800401 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800402 Optional<Registration> registration =
403 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
404 if (registration.isPresent()) {
405 List<Registration> updatedRegistrations =
406 registrations.stream()
407 .filter(r -> r.sessionId() != session.id())
408 .collect(Collectors.toList());
409 if (leader.sessionId() == session.id()) {
410 if (updatedRegistrations.size() > 0) {
411 return new ElectionState(updatedRegistrations,
412 updatedRegistrations.get(0),
413 termCounter.get(),
414 System.currentTimeMillis());
415 } else {
416 return new ElectionState(updatedRegistrations, null, term, termStartTime);
417 }
418 } else {
419 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
420 }
421 } else {
422 return this;
423 }
424 }
425
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800426 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
427 Optional<Registration> registration =
428 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
429 if (registration.isPresent()) {
430 List<Registration> updatedRegistrations =
431 registrations.stream()
432 .filter(r -> !r.nodeId().equals(nodeId))
433 .collect(Collectors.toList());
434 if (leader.nodeId().equals(nodeId)) {
435 if (updatedRegistrations.size() > 0) {
436 return new ElectionState(updatedRegistrations,
437 updatedRegistrations.get(0),
438 termCounter.get(),
439 System.currentTimeMillis());
440 } else {
441 return new ElectionState(updatedRegistrations, null, term, termStartTime);
442 }
443 } else {
444 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
445 }
446 } else {
447 return this;
448 }
449 }
450
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800451 public boolean isDuplicate(Registration registration) {
452 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
453 }
454
455 public Leader leader() {
456 if (leader == null) {
457 return null;
458 } else {
459 NodeId leaderNodeId = leader.nodeId();
460 return new Leader(leaderNodeId, term, termStartTime);
461 }
462 }
463
464 public List<NodeId> candidates() {
465 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
466 }
467
468 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
469 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
470 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
471 updatedRegistrations.add(registration);
472 boolean newLeader = leader == null;
473 return new ElectionState(updatedRegistrations,
474 newLeader ? registration : leader,
475 newLeader ? termCounter.get() : term,
476 newLeader ? System.currentTimeMillis() : termStartTime);
477 }
478 return this;
479 }
480
481 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
482 Registration newLeader = registrations.stream()
483 .filter(r -> r.nodeId().equals(nodeId))
484 .findFirst()
485 .orElse(null);
486 if (newLeader != null) {
487 return new ElectionState(registrations,
488 newLeader,
489 termCounter.incrementAndGet(),
490 System.currentTimeMillis());
491 } else {
492 return this;
493 }
494 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800495
496 public ElectionState promote(NodeId nodeId) {
497 Registration registration = registrations.stream()
498 .filter(r -> r.nodeId().equals(nodeId))
499 .findFirst()
500 .orElse(null);
501 List<Registration> updatedRegistrations = Lists.newLinkedList();
502 updatedRegistrations.add(registration);
503 registrations.stream()
504 .filter(r -> !r.nodeId().equals(nodeId))
505 .forEach(updatedRegistrations::add);
506 return new ElectionState(updatedRegistrations,
507 leader,
508 term,
509 termStartTime);
510
511 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800512 }
513
514 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800515 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800516 }
517
518 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800519 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800520 onSessionEnd(session);
521 }
522
523 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800524 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800525 onSessionEnd(session);
526 }
527
528 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800529 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800530 onSessionEnd(session);
531 }
532
533 @Override
534 public void snapshot(SnapshotWriter writer) {
535 byte[] encodedTermCounters = serializer.encode(termCounters);
536 writer.writeInt(encodedTermCounters.length);
537 writer.write(encodedTermCounters);
538 byte[] encodedElections = serializer.encode(elections);
539 writer.writeInt(encodedElections.length);
540 writer.write(encodedElections);
541 log.info("Took state machine snapshot");
542 }
543
544 @Override
545 public void install(SnapshotReader reader) {
546 int encodedTermCountersSize = reader.readInt();
547 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
548 reader.read(encodedTermCounters);
549 termCounters = serializer.decode(encodedTermCounters);
550 int encodedElectionsSize = reader.readInt();
551 byte[] encodedElections = new byte[encodedElectionsSize];
552 reader.read(encodedElections);
553 elections = serializer.decode(encodedElections);
554 log.info("Reinstated state machine from snapshot");
555 }
556
557 private AtomicLong termCounter(String topic) {
558 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
559 }
560}