blob: 2ae6e680ff082140ad8da665580de4b74af260f6 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19import io.atomix.copycat.client.session.Session;
20import io.atomix.copycat.server.Commit;
21import io.atomix.copycat.server.Snapshottable;
22import io.atomix.copycat.server.StateMachineExecutor;
23import io.atomix.copycat.server.session.SessionListener;
24import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
25import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
26import io.atomix.resource.ResourceStateMachine;
27
28import java.util.Arrays;
29import java.util.HashMap;
30import java.util.LinkedHashMap;
31import java.util.LinkedList;
32import java.util.List;
33import java.util.Map;
34import java.util.Optional;
35import java.util.Set;
36import java.util.concurrent.atomic.AtomicLong;
37import java.util.function.Supplier;
38import java.util.stream.Collectors;
39
40import org.onosproject.cluster.Leader;
41import org.onosproject.cluster.Leadership;
42import org.onosproject.cluster.NodeId;
43import org.onosproject.event.Change;
44import org.onosproject.store.serializers.KryoNamespaces;
45import org.onosproject.store.service.Serializer;
46import org.slf4j.Logger;
47
48import com.google.common.base.MoreObjects;
49import com.google.common.base.Objects;
50import com.google.common.collect.Lists;
51import com.google.common.collect.Maps;
52
53/**
54 * State machine for {@link AtomixLeaderElector} resource.
55 */
56public class AtomixLeaderElectorState extends ResourceStateMachine
57 implements SessionListener, Snapshottable {
58
59 private final Logger log = getLogger(getClass());
60 private Map<String, AtomicLong> termCounters = new HashMap<>();
61 private Map<String, ElectionState> elections = new HashMap<>();
62 private final Map<Long, Commit<? extends AtomixLeaderElectorCommands.Listen>> listeners = new LinkedHashMap<>();
63 private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
64 ElectionState.class,
65 Registration.class);
66
67 @Override
68 protected void configure(StateMachineExecutor executor) {
69 // Notification
70 executor.register(AtomixLeaderElectorCommands.Listen.class, this::listen);
71 executor.register(AtomixLeaderElectorCommands.Unlisten.class, this::unlisten);
72 // Commands
73 executor.register(AtomixLeaderElectorCommands.Run.class, this::run);
74 executor.register(AtomixLeaderElectorCommands.Withdraw.class, this::withdraw);
75 executor.register(AtomixLeaderElectorCommands.Anoint.class, this::anoint);
76 // Queries
77 executor.register(AtomixLeaderElectorCommands.GetLeadership.class, this::leadership);
78 executor.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, this::allLeaderships);
79 executor.register(AtomixLeaderElectorCommands.GetElectedTopics.class, this::electedTopics);
80 }
81
82 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
83 Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
84 listeners.values().forEach(listener -> listener.session().publish("change", change));
85 }
86
87 @Override
88 public void delete() {
89 // Close and clear Listeners
90 listeners.values().forEach(Commit::close);
91 listeners.clear();
92 }
93
94 /**
95 * Applies listen commits.
96 *
97 * @param commit listen commit
98 */
99 public void listen(Commit<? extends AtomixLeaderElectorCommands.Listen> commit) {
100 if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
101 commit.close();
102 }
103 }
104
105 /**
106 * Applies unlisten commits.
107 *
108 * @param commit unlisten commit
109 */
110 public void unlisten(Commit<? extends AtomixLeaderElectorCommands.Unlisten> commit) {
111 try {
112 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(commit.session().id());
113 if (listener != null) {
114 listener.close();
115 }
116 } finally {
117 commit.close();
118 }
119 }
120
121 /**
122 * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
123 * @param commit commit entry
124 * @return topic leader. If no previous leader existed this is the node that just entered the race.
125 */
126 public Leadership run(Commit<? extends AtomixLeaderElectorCommands.Run> commit) {
127 try {
128 String topic = commit.operation().topic();
129 Leadership oldLeadership = leadership(topic);
130 Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
131 elections.compute(topic, (k, v) -> {
132 if (v == null) {
133 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
134 } else {
135 if (!v.isDuplicate(registration)) {
136 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
137 } else {
138 return v;
139 }
140 }
141 });
142 Leadership newLeadership = leadership(topic);
143
144 if (!Objects.equal(oldLeadership, newLeadership)) {
145 notifyLeadershipChange(oldLeadership, newLeadership);
146 }
147 return newLeadership;
148 } finally {
149 commit.close();
150 }
151 }
152
153 /**
154 * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
155 * @param commit withdraw commit
156 */
157 public void withdraw(Commit<? extends AtomixLeaderElectorCommands.Withdraw> commit) {
158 try {
159 String topic = commit.operation().topic();
160 Leadership oldLeadership = leadership(topic);
161 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
162 termCounter(topic)::incrementAndGet));
163 Leadership newLeadership = leadership(topic);
164 if (!Objects.equal(oldLeadership, newLeadership)) {
165 notifyLeadershipChange(oldLeadership, newLeadership);
166 }
167 } finally {
168 commit.close();
169 }
170 }
171
172 /**
173 * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
174 * @param commit anoint commit
175 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
176 */
177 public boolean anoint(Commit<? extends AtomixLeaderElectorCommands.Anoint> commit) {
178 try {
179 String topic = commit.operation().topic();
180 Leadership oldLeadership = leadership(topic);
181 ElectionState electionState = elections.computeIfPresent(topic,
182 (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
183 Leadership newLeadership = leadership(topic);
184 if (!Objects.equal(oldLeadership, newLeadership)) {
185 notifyLeadershipChange(oldLeadership, newLeadership);
186 }
187 return (electionState != null &&
188 electionState.leader() != null &&
189 commit.operation().nodeId().equals(electionState.leader().nodeId()));
190 } finally {
191 commit.close();
192 }
193 }
194
195 /**
196 * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
197 * @param commit GetLeadership commit
198 * @return leader
199 */
200 public Leadership leadership(Commit<? extends AtomixLeaderElectorCommands.GetLeadership> commit) {
201 String topic = commit.operation().topic();
202 try {
203 return leadership(topic);
204 } finally {
205 commit.close();
206 }
207 }
208
209 /**
210 * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
211 * @param commit commit entry
212 * @return set of topics for which the node is the leader
213 */
214 public Set<String> electedTopics(Commit<? extends AtomixLeaderElectorCommands.GetElectedTopics> commit) {
215 try {
216 NodeId nodeId = commit.operation().nodeId();
217 return Maps.filterEntries(elections, e -> {
218 Leader leader = leadership(e.getKey()).leader();
219 return leader != null && leader.nodeId().equals(nodeId);
220 }).keySet();
221 } finally {
222 commit.close();
223 }
224 }
225
226 /**
227 * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
228 * @param commit GetAllLeaderships commit
229 * @return topic to leader mapping
230 */
231 public Map<String, Leadership> allLeaderships(
232 Commit<? extends AtomixLeaderElectorCommands.GetAllLeaderships> commit) {
233 try {
234 return Maps.transformEntries(elections, (k, v) -> leadership(k));
235 } finally {
236 commit.close();
237 }
238 }
239
240 private Leadership leadership(String topic) {
241 return new Leadership(topic,
242 leader(topic),
243 candidates(topic));
244 }
245
246 private Leader leader(String topic) {
247 ElectionState electionState = elections.get(topic);
248 return electionState == null ? null : electionState.leader();
249 }
250
251 private List<NodeId> candidates(String topic) {
252 ElectionState electionState = elections.get(topic);
253 return electionState == null ? new LinkedList<>() : electionState.candidates();
254 }
255
256 private void onSessionEnd(Session session) {
257 Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
258 if (listener != null) {
259 listener.close();
260 }
261 Set<String> topics = elections.keySet();
262 topics.forEach(topic -> {
263 Leadership oldLeadership = leadership(topic);
264 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
265 Leadership newLeadership = leadership(topic);
266 if (!Objects.equal(oldLeadership, newLeadership)) {
267 notifyLeadershipChange(oldLeadership, newLeadership);
268 }
269 });
270 }
271
272 private static class Registration {
273 private final NodeId nodeId;
274 private final long sessionId;
275
276 public Registration(NodeId nodeId, long sessionId) {
277 this.nodeId = nodeId;
278 this.sessionId = sessionId;
279 }
280
281 public NodeId nodeId() {
282 return nodeId;
283 }
284
285 public long sessionId() {
286 return sessionId;
287 }
288
289 @Override
290 public String toString() {
291 return MoreObjects.toStringHelper(getClass())
292 .add("nodeId", nodeId)
293 .add("sessionId", sessionId)
294 .toString();
295 }
296 }
297
298 private static class ElectionState {
299 final Registration leader;
300 final long term;
301 final long termStartTime;
302 final List<Registration> registrations;
303
304 public ElectionState(Registration registration, Supplier<Long> termCounter) {
305 registrations = Arrays.asList(registration);
306 term = termCounter.get();
307 termStartTime = System.currentTimeMillis();
308 leader = registration;
309 }
310
311 public ElectionState(ElectionState other) {
312 registrations = Lists.newArrayList(other.registrations);
313 leader = other.leader;
314 term = other.term;
315 termStartTime = other.termStartTime;
316 }
317
318 public ElectionState(List<Registration> registrations,
319 Registration leader,
320 long term,
321 long termStartTime) {
322 this.registrations = Lists.newArrayList(registrations);
323 this.leader = leader;
324 this.term = term;
325 this.termStartTime = termStartTime;
326 }
327
328 public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
329 Optional<Registration> registration =
330 registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
331 if (registration.isPresent()) {
332 List<Registration> updatedRegistrations =
333 registrations.stream()
334 .filter(r -> r.sessionId() != session.id())
335 .collect(Collectors.toList());
336 if (leader.sessionId() == session.id()) {
337 if (updatedRegistrations.size() > 0) {
338 return new ElectionState(updatedRegistrations,
339 updatedRegistrations.get(0),
340 termCounter.get(),
341 System.currentTimeMillis());
342 } else {
343 return new ElectionState(updatedRegistrations, null, term, termStartTime);
344 }
345 } else {
346 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
347 }
348 } else {
349 return this;
350 }
351 }
352
353 public boolean isDuplicate(Registration registration) {
354 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
355 }
356
357 public Leader leader() {
358 if (leader == null) {
359 return null;
360 } else {
361 NodeId leaderNodeId = leader.nodeId();
362 return new Leader(leaderNodeId, term, termStartTime);
363 }
364 }
365
366 public List<NodeId> candidates() {
367 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
368 }
369
370 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
371 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
372 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
373 updatedRegistrations.add(registration);
374 boolean newLeader = leader == null;
375 return new ElectionState(updatedRegistrations,
376 newLeader ? registration : leader,
377 newLeader ? termCounter.get() : term,
378 newLeader ? System.currentTimeMillis() : termStartTime);
379 }
380 return this;
381 }
382
383 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
384 Registration newLeader = registrations.stream()
385 .filter(r -> r.nodeId().equals(nodeId))
386 .findFirst()
387 .orElse(null);
388 if (newLeader != null) {
389 return new ElectionState(registrations,
390 newLeader,
391 termCounter.incrementAndGet(),
392 System.currentTimeMillis());
393 } else {
394 return this;
395 }
396 }
397 }
398
399 @Override
400 public void register(Session session) {
401 }
402
403 @Override
404 public void unregister(Session session) {
405 onSessionEnd(session);
406 }
407
408 @Override
409 public void expire(Session session) {
410 onSessionEnd(session);
411 }
412
413 @Override
414 public void close(Session session) {
415 onSessionEnd(session);
416 }
417
418 @Override
419 public void snapshot(SnapshotWriter writer) {
420 byte[] encodedTermCounters = serializer.encode(termCounters);
421 writer.writeInt(encodedTermCounters.length);
422 writer.write(encodedTermCounters);
423 byte[] encodedElections = serializer.encode(elections);
424 writer.writeInt(encodedElections.length);
425 writer.write(encodedElections);
426 log.info("Took state machine snapshot");
427 }
428
429 @Override
430 public void install(SnapshotReader reader) {
431 int encodedTermCountersSize = reader.readInt();
432 byte[] encodedTermCounters = new byte[encodedTermCountersSize];
433 reader.read(encodedTermCounters);
434 termCounters = serializer.decode(encodedTermCounters);
435 int encodedElectionsSize = reader.readInt();
436 byte[] encodedElections = new byte[encodedElectionsSize];
437 reader.read(encodedElections);
438 elections = serializer.decode(encodedElections);
439 log.info("Reinstated state machine from snapshot");
440 }
441
442 private AtomicLong termCounter(String topic) {
443 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
444 }
445}