blob: b259cd13a94c0e184a5857be3ebe322cf204f8c5 [file] [log] [blame]
Madan Jampani5e5b3d62016-02-01 16:03:33 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Madan Jampani5e5b3d62016-02-01 16:03:33 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
Madan Jampani5e5b3d62016-02-01 16:03:33 -080018import java.util.Arrays;
19import java.util.HashMap;
20import java.util.LinkedHashMap;
21import java.util.LinkedList;
22import java.util.List;
23import java.util.Map;
24import java.util.Optional;
25import java.util.Set;
26import java.util.concurrent.atomic.AtomicLong;
27import java.util.function.Supplier;
28import java.util.stream.Collectors;
29
Jordan Halterman2bf177c2017-06-29 01:49:08 -070030import com.google.common.base.MoreObjects;
31import com.google.common.base.Objects;
32import com.google.common.base.Throwables;
33import com.google.common.collect.ImmutableSet;
34import com.google.common.collect.Lists;
35import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37import io.atomix.protocols.raft.service.AbstractRaftService;
38import io.atomix.protocols.raft.service.Commit;
39import io.atomix.protocols.raft.service.RaftServiceExecutor;
40import io.atomix.protocols.raft.session.RaftSession;
41import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
42import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
43import org.onlab.util.KryoNamespace;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080044import org.onosproject.cluster.Leader;
45import org.onosproject.cluster.Leadership;
46import org.onosproject.cluster.NodeId;
47import org.onosproject.event.Change;
Jordan Halterman2bf177c2017-06-29 01:49:08 -070048import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Anoint;
49import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Evict;
50import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetElectedTopics;
51import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetLeadership;
52import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Promote;
53import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Run;
54import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Withdraw;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080055import org.onosproject.store.service.Serializer;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080056
Jordan Halterman2bf177c2017-06-29 01:49:08 -070057import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents.CHANGE;
58import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ADD_LISTENER;
59import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ANOINT;
60import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.EVICT;
61import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ALL_LEADERSHIPS;
62import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ELECTED_TOPICS;
63import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
64import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.PROMOTE;
65import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.REMOVE_LISTENER;
66import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
67import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.WITHDRAW;
Madan Jampani5e5b3d62016-02-01 16:03:33 -080068
69/**
70 * State machine for {@link AtomixLeaderElector} resource.
71 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -070072public class AtomixLeaderElectorService extends AbstractRaftService {
Madan Jampani5e5b3d62016-02-01 16:03:33 -080073
Jordan Halterman2bf177c2017-06-29 01:49:08 -070074 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
75 .register(AtomixLeaderElectorOperations.NAMESPACE)
76 .register(AtomixLeaderElectorEvents.NAMESPACE)
77 .register(ElectionState.class)
78 .register(Registration.class)
79 .register(new LinkedHashMap<>().keySet().getClass())
80 .build());
81
Madan Jampani5e5b3d62016-02-01 16:03:33 -080082 private Map<String, AtomicLong> termCounters = new HashMap<>();
83 private Map<String, ElectionState> elections = new HashMap<>();
Jordan Halterman2bf177c2017-06-29 01:49:08 -070084 private Map<Long, RaftSession> listeners = new LinkedHashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -080085
Jordan Halterman2bf177c2017-06-29 01:49:08 -070086 @Override
87 public void snapshot(SnapshotWriter writer) {
88 writer.writeObject(Sets.newHashSet(listeners.keySet()), SERIALIZER::encode);
89 writer.writeObject(termCounters, SERIALIZER::encode);
90 writer.writeObject(elections, SERIALIZER::encode);
Jordan Halterman6807c8f2017-08-28 20:58:24 -070091 logger().debug("Took state machine snapshot");
Madan Jampani86cb2432016-02-17 11:07:56 -080092 }
93
Madan Jampani5e5b3d62016-02-01 16:03:33 -080094 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -070095 public void install(SnapshotReader reader) {
96 listeners = new LinkedHashMap<>();
97 for (Long sessionId : reader.<Set<Long>>readObject(SERIALIZER::decode)) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -070098 listeners.put(sessionId, sessions().getSession(sessionId));
Jordan Halterman2bf177c2017-06-29 01:49:08 -070099 }
100 termCounters = reader.readObject(SERIALIZER::decode);
101 elections = reader.readObject(SERIALIZER::decode);
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700102 logger().debug("Reinstated state machine from snapshot");
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700103 }
104
105 @Override
106 protected void configure(RaftServiceExecutor executor) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800107 // Notification
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700108 executor.register(ADD_LISTENER, this::listen);
109 executor.register(REMOVE_LISTENER, this::unlisten);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800110 // Commands
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700111 executor.register(RUN, SERIALIZER::decode, this::run, SERIALIZER::encode);
112 executor.register(WITHDRAW, SERIALIZER::decode, this::withdraw);
113 executor.register(ANOINT, SERIALIZER::decode, this::anoint, SERIALIZER::encode);
114 executor.register(PROMOTE, SERIALIZER::decode, this::promote, SERIALIZER::encode);
115 executor.register(EVICT, SERIALIZER::decode, this::evict);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800116 // Queries
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700117 executor.register(GET_LEADERSHIP, SERIALIZER::decode, this::getLeadership, SERIALIZER::encode);
118 executor.register(GET_ALL_LEADERSHIPS, this::allLeaderships, SERIALIZER::encode);
119 executor.register(GET_ELECTED_TOPICS, SERIALIZER::decode, this::electedTopics, SERIALIZER::encode);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800120 }
121
122 private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800123 notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800124 }
125
126 private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
127 if (changes.isEmpty()) {
128 return;
129 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700130 listeners.values().forEach(session -> session.publish(CHANGE, SERIALIZER::encode, changes));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800131 }
132
133 /**
134 * Applies listen commits.
135 *
136 * @param commit listen commit
137 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700138 public void listen(Commit<Void> commit) {
139 listeners.put(commit.session().sessionId().id(), commit.session());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800140 }
141
142 /**
143 * Applies unlisten commits.
144 *
145 * @param commit unlisten commit
146 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700147 public void unlisten(Commit<Void> commit) {
148 listeners.remove(commit.session().sessionId().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800149 }
150
151 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700152 * Applies an {@link AtomixLeaderElectorOperations.Run} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800153 * @param commit commit entry
154 * @return topic leader. If no previous leader existed this is the node that just entered the race.
155 */
Madan Jampanifc981772016-02-16 09:46:42 -0800156 public Leadership run(Commit<? extends Run> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800157 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700158 String topic = commit.value().topic();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800159 Leadership oldLeadership = leadership(topic);
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700160 Registration registration = new Registration(commit.value().nodeId(), commit.session().sessionId().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800161 elections.compute(topic, (k, v) -> {
162 if (v == null) {
163 return new ElectionState(registration, termCounter(topic)::incrementAndGet);
164 } else {
165 if (!v.isDuplicate(registration)) {
166 return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
167 } else {
168 return v;
169 }
170 }
171 });
172 Leadership newLeadership = leadership(topic);
173
174 if (!Objects.equal(oldLeadership, newLeadership)) {
175 notifyLeadershipChange(oldLeadership, newLeadership);
176 }
177 return newLeadership;
Madan Jampaniea98f412016-06-22 09:05:40 -0700178 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700179 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700180 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800181 }
182 }
183
184 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700185 * Applies an {@link AtomixLeaderElectorOperations.Withdraw} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800186 * @param commit withdraw commit
187 */
Madan Jampanifc981772016-02-16 09:46:42 -0800188 public void withdraw(Commit<? extends Withdraw> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800189 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700190 String topic = commit.value().topic();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800191 Leadership oldLeadership = leadership(topic);
192 elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700193 termCounter(topic)::incrementAndGet));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800194 Leadership newLeadership = leadership(topic);
195 if (!Objects.equal(oldLeadership, newLeadership)) {
196 notifyLeadershipChange(oldLeadership, newLeadership);
197 }
Madan Jampaniea98f412016-06-22 09:05:40 -0700198 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700199 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700200 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800201 }
202 }
203
204 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700205 * Applies an {@link AtomixLeaderElectorOperations.Anoint} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800206 * @param commit anoint commit
207 * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
208 */
Madan Jampanifc981772016-02-16 09:46:42 -0800209 public boolean anoint(Commit<? extends Anoint> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800210 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700211 String topic = commit.value().topic();
212 NodeId nodeId = commit.value().nodeId();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800213 Leadership oldLeadership = leadership(topic);
214 ElectionState electionState = elections.computeIfPresent(topic,
Madan Jampanic94b4852016-02-23 18:18:37 -0800215 (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800216 Leadership newLeadership = leadership(topic);
217 if (!Objects.equal(oldLeadership, newLeadership)) {
218 notifyLeadershipChange(oldLeadership, newLeadership);
219 }
220 return (electionState != null &&
221 electionState.leader() != null &&
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700222 commit.value().nodeId().equals(electionState.leader().nodeId()));
Madan Jampaniea98f412016-06-22 09:05:40 -0700223 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700224 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700225 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800226 }
227 }
228
229 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700230 * Applies an {@link AtomixLeaderElectorOperations.Promote} commit.
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800231 * @param commit promote commit
232 * @return {@code true} if changes desired end state is achieved.
233 */
234 public boolean promote(Commit<? extends Promote> commit) {
235 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700236 String topic = commit.value().topic();
237 NodeId nodeId = commit.value().nodeId();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800238 Leadership oldLeadership = leadership(topic);
239 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
240 return false;
241 }
Madan Jampanic94b4852016-02-23 18:18:37 -0800242 elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800243 Leadership newLeadership = leadership(topic);
244 if (!Objects.equal(oldLeadership, newLeadership)) {
245 notifyLeadershipChange(oldLeadership, newLeadership);
246 }
247 return true;
Madan Jampaniea98f412016-06-22 09:05:40 -0700248 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700249 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700250 throw Throwables.propagate(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800251 }
252 }
253
254 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700255 * Applies an {@link AtomixLeaderElectorOperations.Evict} commit.
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800256 * @param commit evict commit
257 */
258 public void evict(Commit<? extends Evict> commit) {
259 try {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800260 List<Change<Leadership>> changes = Lists.newArrayList();
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700261 NodeId nodeId = commit.value().nodeId();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800262 Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
263 topics.forEach(topic -> {
264 Leadership oldLeadership = leadership(topic);
265 elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
266 Leadership newLeadership = leadership(topic);
267 if (!Objects.equal(oldLeadership, newLeadership)) {
268 changes.add(new Change<>(oldLeadership, newLeadership));
269 }
270 });
271 notifyLeadershipChanges(changes);
Madan Jampaniea98f412016-06-22 09:05:40 -0700272 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700273 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700274 throw Throwables.propagate(e);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800275 }
276 }
277
278 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700279 * Applies an {@link AtomixLeaderElectorOperations.GetLeadership} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800280 * @param commit GetLeadership commit
281 * @return leader
282 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700283 public Leadership getLeadership(Commit<? extends GetLeadership> commit) {
284 String topic = commit.value().topic();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800285 try {
286 return leadership(topic);
Madan Jampaniea98f412016-06-22 09:05:40 -0700287 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700288 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700289 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800290 }
291 }
292
293 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700294 * Applies an {@link AtomixLeaderElectorOperations.GetElectedTopics} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800295 * @param commit commit entry
296 * @return set of topics for which the node is the leader
297 */
Madan Jampanifc981772016-02-16 09:46:42 -0800298 public Set<String> electedTopics(Commit<? extends GetElectedTopics> commit) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800299 try {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700300 NodeId nodeId = commit.value().nodeId();
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700301 return ImmutableSet.copyOf(Maps.filterEntries(elections, e -> {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800302 Leader leader = leadership(e.getKey()).leader();
303 return leader != null && leader.nodeId().equals(nodeId);
Aaron Kruglikovc0c27c02016-06-07 16:05:00 -0700304 }).keySet());
Madan Jampaniea98f412016-06-22 09:05:40 -0700305 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700306 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700307 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800308 }
309 }
310
311 /**
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700312 * Applies an {@link AtomixLeaderElectorOperations#GET_ALL_LEADERSHIPS} commit.
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800313 * @param commit GetAllLeaderships commit
314 * @return topic to leader mapping
315 */
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700316 public Map<String, Leadership> allLeaderships(Commit<Void> commit) {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700317 Map<String, Leadership> result = new HashMap<>();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800318 try {
Madan Jampani630e7ac2016-05-31 11:34:05 -0700319 result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
320 return result;
Madan Jampaniea98f412016-06-22 09:05:40 -0700321 } catch (Exception e) {
Jordan Halterman6807c8f2017-08-28 20:58:24 -0700322 logger().error("State machine operation failed", e);
Madan Jampaniea98f412016-06-22 09:05:40 -0700323 throw Throwables.propagate(e);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800324 }
325 }
326
327 private Leadership leadership(String topic) {
328 return new Leadership(topic,
329 leader(topic),
330 candidates(topic));
331 }
332
333 private Leader leader(String topic) {
334 ElectionState electionState = elections.get(topic);
335 return electionState == null ? null : electionState.leader();
336 }
337
338 private List<NodeId> candidates(String topic) {
339 ElectionState electionState = elections.get(topic);
340 return electionState == null ? new LinkedList<>() : electionState.candidates();
341 }
342
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700343 private void onSessionEnd(RaftSession session) {
344 listeners.remove(session.sessionId().id());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800345 Set<String> topics = elections.keySet();
Madan Jampanidb2afd32016-02-23 16:26:45 -0800346 List<Change<Leadership>> changes = Lists.newArrayList();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800347 topics.forEach(topic -> {
348 Leadership oldLeadership = leadership(topic);
349 elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
350 Leadership newLeadership = leadership(topic);
351 if (!Objects.equal(oldLeadership, newLeadership)) {
Madan Jampanidb2afd32016-02-23 16:26:45 -0800352 changes.add(new Change<>(oldLeadership, newLeadership));
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800353 }
354 });
Madan Jampanidb2afd32016-02-23 16:26:45 -0800355 notifyLeadershipChanges(changes);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800356 }
357
358 private static class Registration {
359 private final NodeId nodeId;
360 private final long sessionId;
361
362 public Registration(NodeId nodeId, long sessionId) {
363 this.nodeId = nodeId;
364 this.sessionId = sessionId;
365 }
366
367 public NodeId nodeId() {
368 return nodeId;
369 }
370
371 public long sessionId() {
372 return sessionId;
373 }
374
375 @Override
376 public String toString() {
377 return MoreObjects.toStringHelper(getClass())
378 .add("nodeId", nodeId)
379 .add("sessionId", sessionId)
380 .toString();
381 }
382 }
383
384 private static class ElectionState {
385 final Registration leader;
386 final long term;
387 final long termStartTime;
388 final List<Registration> registrations;
389
390 public ElectionState(Registration registration, Supplier<Long> termCounter) {
391 registrations = Arrays.asList(registration);
392 term = termCounter.get();
393 termStartTime = System.currentTimeMillis();
394 leader = registration;
395 }
396
397 public ElectionState(ElectionState other) {
398 registrations = Lists.newArrayList(other.registrations);
399 leader = other.leader;
400 term = other.term;
401 termStartTime = other.termStartTime;
402 }
403
404 public ElectionState(List<Registration> registrations,
405 Registration leader,
406 long term,
407 long termStartTime) {
408 this.registrations = Lists.newArrayList(registrations);
409 this.leader = leader;
410 this.term = term;
411 this.termStartTime = termStartTime;
412 }
413
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700414 public ElectionState cleanup(RaftSession session, Supplier<Long> termCounter) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800415 Optional<Registration> registration =
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700416 registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst();
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800417 if (registration.isPresent()) {
418 List<Registration> updatedRegistrations =
419 registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700420 .filter(r -> r.sessionId() != session.sessionId().id())
421 .collect(Collectors.toList());
422 if (leader.sessionId() == session.sessionId().id()) {
Jon Hallcbd1b392017-01-18 20:15:44 -0800423 if (!updatedRegistrations.isEmpty()) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800424 return new ElectionState(updatedRegistrations,
425 updatedRegistrations.get(0),
426 termCounter.get(),
427 System.currentTimeMillis());
428 } else {
429 return new ElectionState(updatedRegistrations, null, term, termStartTime);
430 }
431 } else {
432 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
433 }
434 } else {
435 return this;
436 }
437 }
438
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800439 public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
440 Optional<Registration> registration =
441 registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
442 if (registration.isPresent()) {
443 List<Registration> updatedRegistrations =
444 registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700445 .filter(r -> !r.nodeId().equals(nodeId))
446 .collect(Collectors.toList());
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800447 if (leader.nodeId().equals(nodeId)) {
Jon Hallcbd1b392017-01-18 20:15:44 -0800448 if (!updatedRegistrations.isEmpty()) {
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800449 return new ElectionState(updatedRegistrations,
450 updatedRegistrations.get(0),
451 termCounter.get(),
452 System.currentTimeMillis());
453 } else {
454 return new ElectionState(updatedRegistrations, null, term, termStartTime);
455 }
456 } else {
457 return new ElectionState(updatedRegistrations, leader, term, termStartTime);
458 }
459 } else {
460 return this;
461 }
462 }
463
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800464 public boolean isDuplicate(Registration registration) {
465 return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
466 }
467
468 public Leader leader() {
469 if (leader == null) {
470 return null;
471 } else {
472 NodeId leaderNodeId = leader.nodeId();
473 return new Leader(leaderNodeId, term, termStartTime);
474 }
475 }
476
477 public List<NodeId> candidates() {
478 return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
479 }
480
481 public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
482 if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
483 List<Registration> updatedRegistrations = new LinkedList<>(registrations);
484 updatedRegistrations.add(registration);
485 boolean newLeader = leader == null;
486 return new ElectionState(updatedRegistrations,
487 newLeader ? registration : leader,
488 newLeader ? termCounter.get() : term,
489 newLeader ? System.currentTimeMillis() : termStartTime);
490 }
491 return this;
492 }
493
494 public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
495 Registration newLeader = registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700496 .filter(r -> r.nodeId().equals(nodeId))
497 .findFirst()
498 .orElse(null);
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800499 if (newLeader != null) {
500 return new ElectionState(registrations,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700501 newLeader,
502 termCounter.incrementAndGet(),
503 System.currentTimeMillis());
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800504 } else {
505 return this;
506 }
507 }
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800508
509 public ElectionState promote(NodeId nodeId) {
510 Registration registration = registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700511 .filter(r -> r.nodeId().equals(nodeId))
512 .findFirst()
513 .orElse(null);
Madan Jampanic94b4852016-02-23 18:18:37 -0800514 List<Registration> updatedRegistrations = Lists.newArrayList();
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800515 updatedRegistrations.add(registration);
516 registrations.stream()
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700517 .filter(r -> !r.nodeId().equals(nodeId))
518 .forEach(updatedRegistrations::add);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800519 return new ElectionState(updatedRegistrations,
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700520 leader,
521 term,
522 termStartTime);
Madan Jampani0c0cdc62016-02-22 16:54:06 -0800523
524 }
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800525 }
526
527 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700528 public void onExpire(RaftSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800529 onSessionEnd(session);
530 }
531
532 @Override
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700533 public void onClose(RaftSession session) {
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800534 onSessionEnd(session);
535 }
536
Madan Jampani5e5b3d62016-02-01 16:03:33 -0800537 private AtomicLong termCounter(String topic) {
538 return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
539 }
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700540}