blob: e8abfac2c86c2e448efca81fb8751e54271ce849 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19import io.atomix.copycat.client.session.Session;
20import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
27
28import java.util.Arrays;
29import java.util.HashMap;
30import java.util.LinkedHashMap;
31import java.util.LinkedList;
32import java.util.List;
33import java.util.Map;
34import java.util.Optional;
35import java.util.Set;
36import java.util.concurrent.atomic.AtomicLong;
37import java.util.function.Supplier;
38import java.util.stream.Collectors;
39
40import org.onosproject.cluster.Leader;
41import org.onosproject.cluster.Leadership;
42import org.onosproject.cluster.NodeId;
43import org.onosproject.event.Change;
Madan Jampanifc981772016-02-16 09:46:42 -080044import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
45import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
46import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
47import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
48import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080052import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.service.Serializer;
54import org.slf4j.Logger;
55
56import com.google.common.base.MoreObjects;
57import com.google.common.base.Objects;
58import com.google.common.collect.Lists;
59import com.google.common.collect.Maps;
60
61/**
62 * State machine for {@link AtomixLeaderElector} resource.
63 */
64public class AtomixLeaderElectorState extends ResourceStateMachine
65 implements SessionListener, Snapshottable {
66
67 private final Logger log = getLogger(getClass());
68 private Map<String, AtomicLong> termCounters = new HashMap<>();
69 private Map<String, ElectionState> elections = new HashMap<>();
Madan Jampanifc981772016-02-16 09:46:42 -080070 private final Map<Long, Commit<? extends Listen>> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080071 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
72 ElectionState.class,
73 Registration.class);
74
75 @Override
76 protected void configure(StateMachineExecutor executor) {
77 // Notification
Madan Jampanifc981772016-02-16 09:46:42 -080078 executor.register(Listen.class, this::listen);
79 executor.register(Unlisten.class, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080080 // Commands
Madan Jampanifc981772016-02-16 09:46:42 -080081 executor.register(Run.class, this::run);
82 executor.register(Withdraw.class, this::withdraw);
83 executor.register(Anoint.class, this::anoint);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080084 // Queries
Madan Jampanifc981772016-02-16 09:46:42 -080085 executor.register(GetLeadership.class, this::leadership);
86 executor.register(GetAllLeaderships.class, this::allLeaderships);
87 executor.register(GetElectedTopics.class, this::electedTopics);
Madan Jampani5e5b3d62016-02-01 16:03:33 -080088 }
89
90 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
91 Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
92 listeners.values().forEach(listener -> listener.session().publish("change", change));
93 }
94
95 @Override
96 public void delete() {
97 // Close and clear Listeners
98 listeners.values().forEach(Commit::close);
99 listeners.clear();
100 }
101
102 /**
103 * Applies listen commits.
104 *
105 * @param commit listen commit
106 */
Madan Jampanifc981772016-02-16 09:46:42 -0800107 public void listen(Commit<? extends Listen> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800108 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
109 commit.close();
110 }
111 }
112
113 /**
114 * Applies unlisten commits.
115 *
116 * @param commit unlisten commit
117 */
Madan Jampanifc981772016-02-16 09:46:42 -0800118 public void unlisten(Commit<? extends Unlisten> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800119 try {
Madan Jampanifc981772016-02-16 09:46:42 -0800120 Commit<? extends Listen> listener = listeners.remove(commit.session().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800121 if (listener != null) {
122 listener.close();
123 }
124 } finally {
125 commit.close();
126 }
127 }
128
129 /**
130 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
131 * @param commit commit entry
132 * @return topic leader. If no previous leader existed this is the node that just entered the race.
133 */
Madan Jampanifc981772016-02-16 09:46:42 -0800134 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800135 try {
136 String topic = commit.operation().topic();
137 Leadership oldLeadership = leadership(topic);
138 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
139 elections.compute(topic, (k, v) -> {
140 if (v == null) {
141 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
142 } else {
143 if (!v.isDuplicate(registration)) {
144 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
145 } else {
146 return v;
147 }
148 }
149 });
150 Leadership newLeadership = leadership(topic);
151
152 if (!Objects.equal(oldLeadership, newLeadership)) {
153 notifyLeadershipChange(oldLeadership, newLeadership);
154 }
155 return newLeadership;
156 } finally {
157 commit.close();
158 }
159 }
160
161 /**
162 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
163 * @param commit withdraw commit
164 */
Madan Jampanifc981772016-02-16 09:46:42 -0800165 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800166 try {
167 String topic = commit.operation().topic();
168 Leadership oldLeadership = leadership(topic);
169 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
170 termCounter(topic)::incrementAndGet));
171 Leadership newLeadership = leadership(topic);
172 if (!Objects.equal(oldLeadership, newLeadership)) {
173 notifyLeadershipChange(oldLeadership, newLeadership);
174 }
175 } finally {
176 commit.close();
177 }
178 }
179
180 /**
181 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
182 * @param commit anoint commit
183 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
184 */
Madan Jampanifc981772016-02-16 09:46:42 -0800185 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800186 try {
187 String topic = commit.operation().topic();
188 Leadership oldLeadership = leadership(topic);
189 ElectionState electionState = elections.computeIfPresent(topic,
190 (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
191 Leadership newLeadership = leadership(topic);
192 if (!Objects.equal(oldLeadership, newLeadership)) {
193 notifyLeadershipChange(oldLeadership, newLeadership);
194 }
195 return (electionState != null &&
196 electionState.leader() != null &&
197 commit.operation().nodeId().equals(electionState.leader().nodeId()));
198 } finally {
199 commit.close();
200 }
201 }
202
203 /**
204 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
205 * @param commit GetLeadership commit
206 * @return leader
207 */
Madan Jampanifc981772016-02-16 09:46:42 -0800208 public Leadership leadership(Commit<? extends GetLeadership> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800209 String topic = commit.operation().topic();
210 try {
211 return leadership(topic);
212 } finally {
213 commit.close();
214 }
215 }
216
217 /**
218 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
219 * @param commit commit entry
220 * @return set of topics for which the node is the leader
221 */
Madan Jampanifc981772016-02-16 09:46:42 -0800222 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800223 try {
224 NodeId nodeId = commit.operation().nodeId();
225 return Maps.filterEntries(elections, e -> {
226 Leader leader = leadership(e.getKey()).leader();
227 return leader != null && leader.nodeId().equals(nodeId);
228 }).keySet();
229 } finally {
230 commit.close();
231 }
232 }
233
234 /**
235 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
236 * @param commit GetAllLeaderships commit
237 * @return topic to leader mapping
238 */
Madan Jampanifc981772016-02-16 09:46:42 -0800239 public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800240 try {
241 return Maps.transformEntries(elections, (k, v) -> leadership(k));
242 } finally {
243 commit.close();
244 }
245 }
246
247 private Leadership leadership(String topic) {
248 return new Leadership(topic,
249 leader(topic),
250 candidates(topic));
251 }
252
253 private Leader leader(String topic) {
254 ElectionState electionState = elections.get(topic);
255 return electionState == null ? null : electionState.leader();
256 }
257
258 private List<NodeId> candidates(String topic) {
259 ElectionState electionState = elections.get(topic);
260 return electionState == null ? new LinkedList<>() : electionState.candidates();
261 }
262
263 private void onSessionEnd(Session session) {
264 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
265 if (listener != null) {
266 listener.close();
267 }
268 Set<String> topics = elections.keySet();
269 topics.forEach(topic -> {
270 Leadership oldLeadership = leadership(topic);
271 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
272 Leadership newLeadership = leadership(topic);
273 if (!Objects.equal(oldLeadership, newLeadership)) {
274 notifyLeadershipChange(oldLeadership, newLeadership);
275 }
276 });
277 }
278
279 private static class Registration {
280 private final NodeId nodeId;
281 private final long sessionId;
282
283 public Registration(NodeId nodeId, long sessionId) {
284 this.nodeId = nodeId;
285 this.sessionId = sessionId;
286 }
287
288 public NodeId nodeId() {
289 return nodeId;
290 }
291
292 public long sessionId() {
293 return sessionId;
294 }
295
296 @Override
297 public String toString() {
298 return MoreObjects.toStringHelper(getClass())
299 .add("nodeId", nodeId)
300 .add("sessionId", sessionId)
301 .toString();
302 }
303 }
304
305 private static class ElectionState {
306 final Registration leader;
307 final long term;
308 final long termStartTime;
309 final List<Registration> registrations;
310
311 public ElectionState(Registration registration, Supplier<Long> termCounter) {
312 registrations = Arrays.asList(registration);
313 term = termCounter.get();
314 termStartTime = System.currentTimeMillis();
315 leader = registration;
316 }
317
318 public ElectionState(ElectionState other) {
319 registrations = Lists.newArrayList(other.registrations);
320 leader = other.leader;
321 term = other.term;
322 termStartTime = other.termStartTime;
323 }
324
325 public ElectionState(List<Registration> registrations,
326 Registration leader,
327 long term,
328 long termStartTime) {
329 this.registrations = Lists.newArrayList(registrations);
330 this.leader = leader;
331 this.term = term;
332 this.termStartTime = termStartTime;
333 }
334
335 public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
336 Optional<Registration> registration =
337 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
338 if (registration.isPresent()) {
339 List<Registration> updatedRegistrations =
340 registrations.stream()
341 .filter(r -> r.sessionId() != session.id())
342 .collect(Collectors.toList());
343 if (leader.sessionId() == session.id()) {
344 if (updatedRegistrations.size() > 0) {
345 return new ElectionState(updatedRegistrations,
346 updatedRegistrations.get(0),
347 termCounter.get(),
348 System.currentTimeMillis());
349 } else {
350 return new ElectionState(updatedRegistrations, null, term, termStartTime);
351 }
352 } else {
353 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
354 }
355 } else {
356 return this;
357 }
358 }
359
360 public boolean isDuplicate(Registration registration) {
361 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
362 }
363
364 public Leader leader() {
365 if (leader == null) {
366 return null;
367 } else {
368 NodeId leaderNodeId = leader.nodeId();
369 return new Leader(leaderNodeId, term, termStartTime);
370 }
371 }
372
373 public List<NodeId> candidates() {
374 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
375 }
376
377 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
378 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
379 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
380 updatedRegistrations.add(registration);
381 boolean newLeader = leader == null;
382 return new ElectionState(updatedRegistrations,
383 newLeader ? registration : leader,
384 newLeader ? termCounter.get() : term,
385 newLeader ? System.currentTimeMillis() : termStartTime);
386 }
387 return this;
388 }
389
390 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
391 Registration newLeader = registrations.stream()
392 .filter(r -> r.nodeId().equals(nodeId))
393 .findFirst()
394 .orElse(null);
395 if (newLeader != null) {
396 return new ElectionState(registrations,
397 newLeader,
398 termCounter.incrementAndGet(),
399 System.currentTimeMillis());
400 } else {
401 return this;
402 }
403 }
404 }
405
406 @Override
407 public void register(Session session) {
408 }
409
410 @Override
411 public void unregister(Session session) {
412 onSessionEnd(session);
413 }
414
415 @Override
416 public void expire(Session session) {
417 onSessionEnd(session);
418 }
419
420 @Override
421 public void close(Session session) {
422 onSessionEnd(session);
423 }
424
425 @Override
426 public void snapshot(SnapshotWriter writer) {
427 byte[] encodedTermCounters = serializer.encode(termCounters);
428 writer.writeInt(encodedTermCounters.length);
429 writer.write(encodedTermCounters);
430 byte[] encodedElections = serializer.encode(elections);
431 writer.writeInt(encodedElections.length);
432 writer.write(encodedElections);
433 log.info("Took state machine snapshot");
434 }
435
436 @Override
437 public void install(SnapshotReader reader) {
438 int encodedTermCountersSize = reader.readInt();
439 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
440 reader.read(encodedTermCounters);
441 termCounters = serializer.decode(encodedTermCounters);
442 int encodedElectionsSize = reader.readInt();
443 byte[] encodedElections = new byte[encodedElectionsSize];
444 reader.read(encodedElections);
445 elections = serializer.decode(encodedElections);
446 log.info("Reinstated state machine from snapshot");
447 }
448
449 private AtomicLong termCounter(String topic) {
450 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
451 }
452}