blob: 9b5822621c2b5e3d8509b4b4a80a7da5485ab5aa [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19import io.atomix.copycat.client.session.Session;
20import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
Madan Jampani86cb2432016-02-17 11:07:56 -080027import io.atomix.resource.ResourceType;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080028
29import java.util.Arrays;
30import java.util.HashMap;
31import java.util.LinkedHashMap;
32import java.util.LinkedList;
33import java.util.List;
34import java.util.Map;
35import java.util.Optional;
36import java.util.Set;
37import java.util.concurrent.atomic.AtomicLong;
38import java.util.function.Supplier;
39import java.util.stream.Collectors;
40
41import org.onosproject.cluster.Leader;
42import org.onosproject.cluster.Leadership;
43import org.onosproject.cluster.NodeId;
44import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080045import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
46import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
47import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
52import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080053import org.onosproject.store.serializers.KryoNamespaces;
54import org.onosproject.store.service.Serializer;
55import org.slf4j.Logger;
56
57import com.google.common.base.MoreObjects;
58import com.google.common.base.Objects;
59import com.google.common.collect.Lists;
60import com.google.common.collect.Maps;
61
62/**
63 * State machine for {@link AtomixLeaderElector} resource.
64 */
65public class AtomixLeaderElectorState extends ResourceStateMachine
66 implements SessionListener, Snapshottable {
67
68 private final Logger log = getLogger(getClass());
69 private Map<String, AtomicLong> termCounters = new HashMap<>();
70 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080071 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080072 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
73 ElectionState.class,
74 Registration.class);
75
Madan Jampani86cb2432016-02-17 11:07:56 -080076 public AtomixLeaderElectorState() {
77 super(new ResourceType(AtomixLeaderElector.class));
78 }
79
Madan Jampani5e5b3d62016-02-01 16:03:33 -080080 @Override
81 protected void configure(StateMachineExecutor executor) {
82 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080083 executor.register(Listen.class, this::listen);
84 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080086 executor.register(Run.class, this::run);
87 executor.register(Withdraw.class, this::withdraw);
88 executor.register(Anoint.class, this::anoint);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080089 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080090 executor.register(GetLeadership.class, this::leadership);
91 executor.register(GetAllLeaderships.class, this::allLeaderships);
92 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080093 }
94
95 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
96 Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
97 listeners.values().forEach(listener -> listener.session().publish("change", change));
98 }
99
100 @Override
101 public void delete() {
102 // Close and clear Listeners
103 listeners.values().forEach(Commit::close);
104 listeners.clear();
105 }
106
107 /**
108 * Applies listen commits.
109 *
110 * @param commit listen commit
111 */
Madan Jampanifc981772016-02-16 09:46:42 -0800112 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800113 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
114 commit.close();
115 }
116 }
117
118 /**
119 * Applies unlisten commits.
120 *
121 * @param commit unlisten commit
122 */
Madan Jampanifc981772016-02-16 09:46:42 -0800123 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800124 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800125 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800126 if (listener != null) {
127 listener.close();
128 }
129 } finally {
130 commit.close();
131 }
132 }
133
134 /**
135 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
136 * @param commit commit entry
137 * @return topic leader. If no previous leader existed this is the node that just entered the race.
138 */
Madan Jampanifc981772016-02-16 09:46:42 -0800139 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 try {
141 String topic = commit.operation().topic();
142 Leadership oldLeadership = leadership(topic);
143 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
144 elections.compute(topic, (k, v) -> {
145 if (v == null) {
146 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
147 } else {
148 if (!v.isDuplicate(registration)) {
149 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
150 } else {
151 return v;
152 }
153 }
154 });
155 Leadership newLeadership = leadership(topic);
156
157 if (!Objects.equal(oldLeadership, newLeadership)) {
158 notifyLeadershipChange(oldLeadership, newLeadership);
159 }
160 return newLeadership;
161 } finally {
162 commit.close();
163 }
164 }
165
166 /**
167 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
168 * @param commit withdraw commit
169 */
Madan Jampanifc981772016-02-16 09:46:42 -0800170 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800171 try {
172 String topic = commit.operation().topic();
173 Leadership oldLeadership = leadership(topic);
174 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
175 termCounter(topic)::incrementAndGet));
176 Leadership newLeadership = leadership(topic);
177 if (!Objects.equal(oldLeadership, newLeadership)) {
178 notifyLeadershipChange(oldLeadership, newLeadership);
179 }
180 } finally {
181 commit.close();
182 }
183 }
184
185 /**
186 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
187 * @param commit anoint commit
188 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
189 */
Madan Jampanifc981772016-02-16 09:46:42 -0800190 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 try {
192 String topic = commit.operation().topic();
193 Leadership oldLeadership = leadership(topic);
194 ElectionState electionState = elections.computeIfPresent(topic,
195 (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
196 Leadership newLeadership = leadership(topic);
197 if (!Objects.equal(oldLeadership, newLeadership)) {
198 notifyLeadershipChange(oldLeadership, newLeadership);
199 }
200 return (electionState != null &&
201 electionState.leader() != null &&
202 commit.operation().nodeId().equals(electionState.leader().nodeId()));
203 } finally {
204 commit.close();
205 }
206 }
207
208 /**
209 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
210 * @param commit GetLeadership commit
211 * @return leader
212 */
Madan Jampanifc981772016-02-16 09:46:42 -0800213 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800214 String topic = commit.operation().topic();
215 try {
216 return leadership(topic);
217 } finally {
218 commit.close();
219 }
220 }
221
222 /**
223 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
224 * @param commit commit entry
225 * @return set of topics for which the node is the leader
226 */
Madan Jampanifc981772016-02-16 09:46:42 -0800227 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800228 try {
229 NodeId nodeId = commit.operation().nodeId();
230 return Maps.filterEntries(elections, e -> {
231 Leader leader = leadership(e.getKey()).leader();
232 return leader != null && leader.nodeId().equals(nodeId);
233 }).keySet();
234 } finally {
235 commit.close();
236 }
237 }
238
239 /**
240 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
241 * @param commit GetAllLeaderships commit
242 * @return topic to leader mapping
243 */
Madan Jampanifc981772016-02-16 09:46:42 -0800244 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800245 try {
246 return Maps.transformEntries(elections, (k, v) -> leadership(k));
247 } finally {
248 commit.close();
249 }
250 }
251
252 private Leadership leadership(String topic) {
253 return new Leadership(topic,
254 leader(topic),
255 candidates(topic));
256 }
257
258 private Leader leader(String topic) {
259 ElectionState electionState = elections.get(topic);
260 return electionState == null ? null : electionState.leader();
261 }
262
263 private List<NodeId> candidates(String topic) {
264 ElectionState electionState = elections.get(topic);
265 return electionState == null ? new LinkedList<>() : electionState.candidates();
266 }
267
268 private void onSessionEnd(Session session) {
Madan Jampani86cb2432016-02-17 11:07:56 -0800269 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800270 if (listener != null) {
271 listener.close();
272 }
273 Set<String> topics = elections.keySet();
274 topics.forEach(topic -> {
275 Leadership oldLeadership = leadership(topic);
276 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
277 Leadership newLeadership = leadership(topic);
278 if (!Objects.equal(oldLeadership, newLeadership)) {
279 notifyLeadershipChange(oldLeadership, newLeadership);
280 }
281 });
282 }
283
284 private static class Registration {
285 private final NodeId nodeId;
286 private final long sessionId;
287
288 public Registration(NodeId nodeId, long sessionId) {
289 this.nodeId = nodeId;
290 this.sessionId = sessionId;
291 }
292
293 public NodeId nodeId() {
294 return nodeId;
295 }
296
297 public long sessionId() {
298 return sessionId;
299 }
300
301 @Override
302 public String toString() {
303 return MoreObjects.toStringHelper(getClass())
304 .add("nodeId", nodeId)
305 .add("sessionId", sessionId)
306 .toString();
307 }
308 }
309
310 private static class ElectionState {
311 final Registration leader;
312 final long term;
313 final long termStartTime;
314 final List<Registration> registrations;
315
316 public ElectionState(Registration registration, Supplier<Long> termCounter) {
317 registrations = Arrays.asList(registration);
318 term = termCounter.get();
319 termStartTime = System.currentTimeMillis();
320 leader = registration;
321 }
322
323 public ElectionState(ElectionState other) {
324 registrations = Lists.newArrayList(other.registrations);
325 leader = other.leader;
326 term = other.term;
327 termStartTime = other.termStartTime;
328 }
329
330 public ElectionState(List<Registration> registrations,
331 Registration leader,
332 long term,
333 long termStartTime) {
334 this.registrations = Lists.newArrayList(registrations);
335 this.leader = leader;
336 this.term = term;
337 this.termStartTime = termStartTime;
338 }
339
340 public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
341 Optional<Registration> registration =
342 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
343 if (registration.isPresent()) {
344 List<Registration> updatedRegistrations =
345 registrations.stream()
346 .filter(r -> r.sessionId() != session.id())
347 .collect(Collectors.toList());
348 if (leader.sessionId() == session.id()) {
349 if (updatedRegistrations.size() > 0) {
350 return new ElectionState(updatedRegistrations,
351 updatedRegistrations.get(0),
352 termCounter.get(),
353 System.currentTimeMillis());
354 } else {
355 return new ElectionState(updatedRegistrations, null, term, termStartTime);
356 }
357 } else {
358 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
359 }
360 } else {
361 return this;
362 }
363 }
364
365 public boolean isDuplicate(Registration registration) {
366 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
367 }
368
369 public Leader leader() {
370 if (leader == null) {
371 return null;
372 } else {
373 NodeId leaderNodeId = leader.nodeId();
374 return new Leader(leaderNodeId, term, termStartTime);
375 }
376 }
377
378 public List<NodeId> candidates() {
379 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
380 }
381
382 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
383 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
384 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
385 updatedRegistrations.add(registration);
386 boolean newLeader = leader == null;
387 return new ElectionState(updatedRegistrations,
388 newLeader ? registration : leader,
389 newLeader ? termCounter.get() : term,
390 newLeader ? System.currentTimeMillis() : termStartTime);
391 }
392 return this;
393 }
394
395 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
396 Registration newLeader = registrations.stream()
397 .filter(r -> r.nodeId().equals(nodeId))
398 .findFirst()
399 .orElse(null);
400 if (newLeader != null) {
401 return new ElectionState(registrations,
402 newLeader,
403 termCounter.incrementAndGet(),
404 System.currentTimeMillis());
405 } else {
406 return this;
407 }
408 }
409 }
410
411 @Override
412 public void register(Session session) {
413 }
414
415 @Override
416 public void unregister(Session session) {
417 onSessionEnd(session);
418 }
419
420 @Override
421 public void expire(Session session) {
422 onSessionEnd(session);
423 }
424
425 @Override
426 public void close(Session session) {
427 onSessionEnd(session);
428 }
429
430 @Override
431 public void snapshot(SnapshotWriter writer) {
432 byte[] encodedTermCounters = serializer.encode(termCounters);
433 writer.writeInt(encodedTermCounters.length);
434 writer.write(encodedTermCounters);
435 byte[] encodedElections = serializer.encode(elections);
436 writer.writeInt(encodedElections.length);
437 writer.write(encodedElections);
438 log.info("Took state machine snapshot");
439 }
440
441 @Override
442 public void install(SnapshotReader reader) {
443 int encodedTermCountersSize = reader.readInt();
444 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
445 reader.read(encodedTermCounters);
446 termCounters = serializer.decode(encodedTermCounters);
447 int encodedElectionsSize = reader.readInt();
448 byte[] encodedElections = new byte[encodedElectionsSize];
449 reader.read(encodedElections);
450 elections = serializer.decode(encodedElections);
451 log.info("Reinstated state machine from snapshot");
452 }
453
454 private AtomicLong termCounter(String topic) {
455 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
456 }
457}