blob: ef8c4447db0dbc1794aa8f88a11540f8e398b2cb [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
Madan Jampani3a9911c2016-02-21 11:25:45 -080019import io.atomix.copycat.server.session.ServerSession;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080020import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
Madan Jampani86cb2432016-02-17 11:07:56 -080027import io.atomix.resource.ResourceType;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080028
29import java.util.Arrays;
30import java.util.HashMap;
31import java.util.LinkedHashMap;
32import java.util.LinkedList;
33import java.util.List;
34import java.util.Map;
35import java.util.Optional;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.function.Supplier;
39import java.util.stream.Collectors;
40
41import org.onosproject.cluster.Leader;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080046import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
Madan Jampanifc981772016-02-16 09:46:42 -080047import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
Madan Jampani0c0cdc62016-02-22 16:54:06 -080051import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
Madan Jampanifc981772016-02-16 09:46:42 -080052import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
53import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
54import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.serializers.KryoNamespaces;
56import org.onosproject.store.service.Serializer;
57import org.slf4j.Logger;
58
59import com.google.common.base.MoreObjects;
60import com.google.common.base.Objects;
61import com.google.common.collect.Lists;
62import com.google.common.collect.Maps;
63
64/**
65 * State machine for {@link AtomixLeaderElector} resource.
66 */
67public class AtomixLeaderElectorState extends ResourceStateMachine
68 implements SessionListener, Snapshottable {
69
70 private final Logger log = getLogger(getClass());
71 private Map<String, AtomicLong> termCounters = new HashMap<>();
72 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080073 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080074 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
75 ElectionState.class,
76 Registration.class);
77
Madan Jampani86cb2432016-02-17 11:07:56 -080078 public AtomixLeaderElectorState() {
79 super(new ResourceType(AtomixLeaderElector.class));
80 }
81
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 @Override
83 protected void configure(StateMachineExecutor executor) {
84 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080085 executor.register(Listen.class, this::listen);
86 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080087 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080088 executor.register(Run.class, this::run);
89 executor.register(Withdraw.class, this::withdraw);
90 executor.register(Anoint.class, this::anoint);
Madan Jampani0c0cdc62016-02-22 16:54:06 -080091 executor.register(Promote.class, this::promote);
92 executor.register(Evict.class, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080094 executor.register(GetLeadership.class, this::leadership);
95 executor.register(GetAllLeaderships.class, this::allLeaderships);
96 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080097 }
98
99 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800100 notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
101 }
102
103 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
104 if (changes.isEmpty()) {
105 return;
106 }
107 listeners.values()
108 .forEach(listener -> listener.session()
109 .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 }
111
112 @Override
113 public void delete() {
114 // Close and clear Listeners
115 listeners.values().forEach(Commit::close);
116 listeners.clear();
117 }
118
119 /**
120 * Applies listen commits.
121 *
122 * @param commit listen commit
123 */
Madan Jampanifc981772016-02-16 09:46:42 -0800124 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800125 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
126 commit.close();
127 }
128 }
129
130 /**
131 * Applies unlisten commits.
132 *
133 * @param commit unlisten commit
134 */
Madan Jampanifc981772016-02-16 09:46:42 -0800135 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800136 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800137 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800138 if (listener != null) {
139 listener.close();
140 }
141 } finally {
142 commit.close();
143 }
144 }
145
146 /**
147 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
148 * @param commit commit entry
149 * @return topic leader. If no previous leader existed this is the node that just entered the race.
150 */
Madan Jampanifc981772016-02-16 09:46:42 -0800151 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800152 try {
153 String topic = commit.operation().topic();
154 Leadership oldLeadership = leadership(topic);
155 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
156 elections.compute(topic, (k, v) -> {
157 if (v == null) {
158 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
159 } else {
160 if (!v.isDuplicate(registration)) {
161 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
162 } else {
163 return v;
164 }
165 }
166 });
167 Leadership newLeadership = leadership(topic);
168
169 if (!Objects.equal(oldLeadership, newLeadership)) {
170 notifyLeadershipChange(oldLeadership, newLeadership);
171 }
172 return newLeadership;
173 } finally {
174 commit.close();
175 }
176 }
177
178 /**
179 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
180 * @param commit withdraw commit
181 */
Madan Jampanifc981772016-02-16 09:46:42 -0800182 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800183 try {
184 String topic = commit.operation().topic();
185 Leadership oldLeadership = leadership(topic);
186 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
187 termCounter(topic)::incrementAndGet));
188 Leadership newLeadership = leadership(topic);
189 if (!Objects.equal(oldLeadership, newLeadership)) {
190 notifyLeadershipChange(oldLeadership, newLeadership);
191 }
192 } finally {
193 commit.close();
194 }
195 }
196
197 /**
198 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
199 * @param commit anoint commit
200 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
201 */
Madan Jampanifc981772016-02-16 09:46:42 -0800202 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800203 try {
204 String topic = commit.operation().topic();
205 Leadership oldLeadership = leadership(topic);
206 ElectionState electionState = elections.computeIfPresent(topic,
207 (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
208 Leadership newLeadership = leadership(topic);
209 if (!Objects.equal(oldLeadership, newLeadership)) {
210 notifyLeadershipChange(oldLeadership, newLeadership);
211 }
212 return (electionState != null &&
213 electionState.leader() != null &&
214 commit.operation().nodeId().equals(electionState.leader().nodeId()));
215 } finally {
216 commit.close();
217 }
218 }
219
220 /**
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800221 * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
222 * @param commit promote commit
223 * @return {@code true} if changes desired end state is achieved.
224 */
225 public boolean promote(Commit<? extends Promote> commit) {
226 try {
227 String topic = commit.operation().topic();
228 NodeId nodeId = commit.operation().nodeId();
229 Leadership oldLeadership = leadership(topic);
230 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
231 return false;
232 }
233 elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
234 Leadership newLeadership = leadership(topic);
235 if (!Objects.equal(oldLeadership, newLeadership)) {
236 notifyLeadershipChange(oldLeadership, newLeadership);
237 }
238 return true;
239 } finally {
240 commit.close();
241 }
242 }
243
244 /**
245 * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
246 * @param commit evict commit
247 */
248 public void evict(Commit<? extends Evict> commit) {
249 try {
250 List<Change<Leadership>> changes = Lists.newLinkedList();
251 NodeId nodeId = commit.operation().nodeId();
252 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
253 topics.forEach(topic -> {
254 Leadership oldLeadership = leadership(topic);
255 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
256 Leadership newLeadership = leadership(topic);
257 if (!Objects.equal(oldLeadership, newLeadership)) {
258 changes.add(new Change<>(oldLeadership, newLeadership));
259 }
260 });
261 notifyLeadershipChanges(changes);
262 } finally {
263 commit.close();
264 }
265 }
266
267 /**
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800268 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
269 * @param commit GetLeadership commit
270 * @return leader
271 */
Madan Jampanifc981772016-02-16 09:46:42 -0800272 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800273 String topic = commit.operation().topic();
274 try {
275 return leadership(topic);
276 } finally {
277 commit.close();
278 }
279 }
280
281 /**
282 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
283 * @param commit commit entry
284 * @return set of topics for which the node is the leader
285 */
Madan Jampanifc981772016-02-16 09:46:42 -0800286 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800287 try {
288 NodeId nodeId = commit.operation().nodeId();
289 return Maps.filterEntries(elections, e -> {
290 Leader leader = leadership(e.getKey()).leader();
291 return leader != null && leader.nodeId().equals(nodeId);
292 }).keySet();
293 } finally {
294 commit.close();
295 }
296 }
297
298 /**
299 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
300 * @param commit GetAllLeaderships commit
301 * @return topic to leader mapping
302 */
Madan Jampanifc981772016-02-16 09:46:42 -0800303 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800304 try {
305 return Maps.transformEntries(elections, (k, v) -> leadership(k));
306 } finally {
307 commit.close();
308 }
309 }
310
311 private Leadership leadership(String topic) {
312 return new Leadership(topic,
313 leader(topic),
314 candidates(topic));
315 }
316
317 private Leader leader(String topic) {
318 ElectionState electionState = elections.get(topic);
319 return electionState == null ? null : electionState.leader();
320 }
321
322 private List<NodeId> candidates(String topic) {
323 ElectionState electionState = elections.get(topic);
324 return electionState == null ? new LinkedList<>() : electionState.candidates();
325 }
326
Madan Jampani3a9911c2016-02-21 11:25:45 -0800327 private void onSessionEnd(ServerSession session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800328 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800329 if (listener != null) {
330 listener.close();
331 }
332 Set<String> topics = elections.keySet();
333 topics.forEach(topic -> {
334 Leadership oldLeadership = leadership(topic);
335 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
336 Leadership newLeadership = leadership(topic);
337 if (!Objects.equal(oldLeadership, newLeadership)) {
338 notifyLeadershipChange(oldLeadership, newLeadership);
339 }
340 });
341 }
342
343 private static class Registration {
344 private final NodeId nodeId;
345 private final long sessionId;
346
347 public Registration(NodeId nodeId, long sessionId) {
348 this.nodeId = nodeId;
349 this.sessionId = sessionId;
350 }
351
352 public NodeId nodeId() {
353 return nodeId;
354 }
355
356 public long sessionId() {
357 return sessionId;
358 }
359
360 @Override
361 public String toString() {
362 return MoreObjects.toStringHelper(getClass())
363 .add("nodeId", nodeId)
364 .add("sessionId", sessionId)
365 .toString();
366 }
367 }
368
369 private static class ElectionState {
370 final Registration leader;
371 final long term;
372 final long termStartTime;
373 final List<Registration> registrations;
374
375 public ElectionState(Registration registration, Supplier<Long> termCounter) {
376 registrations = Arrays.asList(registration);
377 term = termCounter.get();
378 termStartTime = System.currentTimeMillis();
379 leader = registration;
380 }
381
382 public ElectionState(ElectionState other) {
383 registrations = Lists.newArrayList(other.registrations);
384 leader = other.leader;
385 term = other.term;
386 termStartTime = other.termStartTime;
387 }
388
389 public ElectionState(List<Registration> registrations,
390 Registration leader,
391 long term,
392 long termStartTime) {
393 this.registrations = Lists.newArrayList(registrations);
394 this.leader = leader;
395 this.term = term;
396 this.termStartTime = termStartTime;
397 }
398
Madan Jampani3a9911c2016-02-21 11:25:45 -0800399 public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800400 Optional<Registration> registration =
401 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
402 if (registration.isPresent()) {
403 List<Registration> updatedRegistrations =
404 registrations.stream()
405 .filter(r -> r.sessionId() != session.id())
406 .collect(Collectors.toList());
407 if (leader.sessionId() == session.id()) {
408 if (updatedRegistrations.size() > 0) {
409 return new ElectionState(updatedRegistrations,
410 updatedRegistrations.get(0),
411 termCounter.get(),
412 System.currentTimeMillis());
413 } else {
414 return new ElectionState(updatedRegistrations, null, term, termStartTime);
415 }
416 } else {
417 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
418 }
419 } else {
420 return this;
421 }
422 }
423
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800424 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
425 Optional<Registration> registration =
426 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
427 if (registration.isPresent()) {
428 List<Registration> updatedRegistrations =
429 registrations.stream()
430 .filter(r -> !r.nodeId().equals(nodeId))
431 .collect(Collectors.toList());
432 if (leader.nodeId().equals(nodeId)) {
433 if (updatedRegistrations.size() > 0) {
434 return new ElectionState(updatedRegistrations,
435 updatedRegistrations.get(0),
436 termCounter.get(),
437 System.currentTimeMillis());
438 } else {
439 return new ElectionState(updatedRegistrations, null, term, termStartTime);
440 }
441 } else {
442 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
443 }
444 } else {
445 return this;
446 }
447 }
448
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800449 public boolean isDuplicate(Registration registration) {
450 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
451 }
452
453 public Leader leader() {
454 if (leader == null) {
455 return null;
456 } else {
457 NodeId leaderNodeId = leader.nodeId();
458 return new Leader(leaderNodeId, term, termStartTime);
459 }
460 }
461
462 public List<NodeId> candidates() {
463 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
464 }
465
466 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
467 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
468 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
469 updatedRegistrations.add(registration);
470 boolean newLeader = leader == null;
471 return new ElectionState(updatedRegistrations,
472 newLeader ? registration : leader,
473 newLeader ? termCounter.get() : term,
474 newLeader ? System.currentTimeMillis() : termStartTime);
475 }
476 return this;
477 }
478
479 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
480 Registration newLeader = registrations.stream()
481 .filter(r -> r.nodeId().equals(nodeId))
482 .findFirst()
483 .orElse(null);
484 if (newLeader != null) {
485 return new ElectionState(registrations,
486 newLeader,
487 termCounter.incrementAndGet(),
488 System.currentTimeMillis());
489 } else {
490 return this;
491 }
492 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800493
494 public ElectionState promote(NodeId nodeId) {
495 Registration registration = registrations.stream()
496 .filter(r -> r.nodeId().equals(nodeId))
497 .findFirst()
498 .orElse(null);
499 List<Registration> updatedRegistrations = Lists.newLinkedList();
500 updatedRegistrations.add(registration);
501 registrations.stream()
502 .filter(r -> !r.nodeId().equals(nodeId))
503 .forEach(updatedRegistrations::add);
504 return new ElectionState(updatedRegistrations,
505 leader,
506 term,
507 termStartTime);
508
509 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800510 }
511
512 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800513 public void register(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800514 }
515
516 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800517 public void unregister(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800518 onSessionEnd(session);
519 }
520
521 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800522 public void expire(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800523 onSessionEnd(session);
524 }
525
526 @Override
Madan Jampani3a9911c2016-02-21 11:25:45 -0800527 public void close(ServerSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800528 onSessionEnd(session);
529 }
530
531 @Override
532 public void snapshot(SnapshotWriter writer) {
533 byte[] encodedTermCounters = serializer.encode(termCounters);
534 writer.writeInt(encodedTermCounters.length);
535 writer.write(encodedTermCounters);
536 byte[] encodedElections = serializer.encode(elections);
537 writer.writeInt(encodedElections.length);
538 writer.write(encodedElections);
539 log.info("Took state machine snapshot");
540 }
541
542 @Override
543 public void install(SnapshotReader reader) {
544 int encodedTermCountersSize = reader.readInt();
545 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
546 reader.read(encodedTermCounters);
547 termCounters = serializer.decode(encodedTermCounters);
548 int encodedElectionsSize = reader.readInt();
549 byte[] encodedElections = new byte[encodedElectionsSize];
550 reader.read(encodedElections);
551 elections = serializer.decode(encodedElections);
552 log.info("Reinstated state machine from snapshot");
553 }
554
555 private AtomicLong termCounter(String topic) {
556 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
557 }
558}