blob: c6647b71dc68f749145cd26f3e26c7beeea34286 [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.ArrayList;
21import java.util.List;
22import java.util.Map;
23import java.util.stream.Collectors;
24
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.Leader;
33import org.onosproject.cluster.Leadership;
34import org.onosproject.cluster.LeadershipEvent;
35import org.onosproject.cluster.LeadershipStore;
36import org.onosproject.cluster.LeadershipStoreDelegate;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.ConsistentMap;
41import org.onosproject.store.service.MapEventListener;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
44import org.onosproject.store.service.Versioned;
45import org.slf4j.Logger;
46
47import com.google.common.base.MoreObjects;
48import com.google.common.base.Objects;
49import com.google.common.collect.ImmutableList;
50import com.google.common.collect.ImmutableMap;
51import com.google.common.collect.ImmutableSet;
52import com.google.common.collect.Maps;
53import com.google.common.collect.Sets;
54
55/**
56 * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
57 */
58@Service
59@Component(immediate = true, enabled = true)
60public class DistributedLeadershipStore
61 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
62 implements LeadershipStore {
63
64 private static final Logger log = getLogger(DistributedLeadershipStore.class);
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected ClusterService clusterService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected StorageService storageService;
71
72 protected NodeId localNodeId;
73 protected ConsistentMap<String, InternalLeadership> leadershipMap;
74 private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
75 event -> {
76 Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
77 Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
78 boolean leaderChanged =
79 !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
80 boolean candidatesChanged =
81 !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
82 ImmutableSet.<NodeId>of() : oldValue.candidates()),
83 Sets.newHashSet(newValue.candidates())).isEmpty();
84 LeadershipEvent.Type eventType = null;
85 if (leaderChanged && candidatesChanged) {
86 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
87 }
88 if (leaderChanged && !candidatesChanged) {
89 eventType = LeadershipEvent.Type.LEADER_CHANGED;
90 }
91 if (!leaderChanged && candidatesChanged) {
92 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
93 }
94 notifyDelegate(new LeadershipEvent(eventType, newValue));
95 };
96
97 @Activate
98 public void activate() {
99 localNodeId = clusterService.getLocalNode().id();
100 leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
101 .withName("onos-leadership")
102 .withPartitionsDisabled()
103 .withRelaxedReadConsistency()
104 .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
105 .build();
106 leadershipMap.addListener(leadershipChangeListener);
107 log.info("Started");
108 }
109
110 @Deactivate
111 public void deactivate() {
112 leadershipMap.removeListener(leadershipChangeListener);
113 log.info("Stopped");
114 }
115
116 @Override
117 public Leadership addRegistration(String topic) {
118 Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
119 v -> v == null || !v.candidates().contains(localNodeId),
120 (k, v) -> {
121 if (v == null || v.candidates().isEmpty()) {
122 return new InternalLeadership(topic,
123 localNodeId,
124 v == null ? 1 : v.term() + 1,
125 System.currentTimeMillis(),
126 ImmutableList.of(localNodeId));
127 }
128 List<NodeId> newCandidates = new ArrayList<>(v.candidates());
129 newCandidates.add(localNodeId);
130 return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
131 });
132 return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
133 }
134
135 @Override
136 public void removeRegistration(String topic) {
137 removeRegistration(topic, localNodeId);
138 }
139
140 private void removeRegistration(String topic, NodeId nodeId) {
141 leadershipMap.computeIf(topic,
142 v -> v != null && v.candidates().contains(nodeId),
143 (k, v) -> {
144 List<NodeId> newCandidates = v.candidates()
145 .stream()
146 .filter(id -> !nodeId.equals(id))
147 .collect(Collectors.toList());
148 NodeId newLeader = nodeId.equals(v.leader()) ?
149 newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
150 long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
151 v.term() : v.term() + 1;
152 long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
153 v.termStartTime() : System.currentTimeMillis();
154 return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
155 });
156 }
157
158 @Override
159 public void removeRegistration(NodeId nodeId) {
160 leadershipMap.entrySet()
161 .stream()
162 .filter(e -> e.getValue().value().candidates().contains(nodeId))
163 .map(e -> e.getKey())
164 .forEach(topic -> this.removeRegistration(topic, nodeId));
165 }
166
167 @Override
168 public boolean moveLeadership(String topic, NodeId toNodeId) {
169 Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
170 v -> v != null &&
171 v.candidates().contains(toNodeId) &&
172 !Objects.equal(v.leader(), toNodeId),
173 (k, v) -> {
174 List<NodeId> newCandidates = new ArrayList<>();
175 newCandidates.add(toNodeId);
176 newCandidates.addAll(v.candidates()
177 .stream()
178 .filter(id -> !toNodeId.equals(id))
179 .collect(Collectors.toList()));
180 return new InternalLeadership(topic,
181 toNodeId,
182 v.term() + 1,
183 System.currentTimeMillis(),
184 newCandidates);
185 });
186 return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
187 }
188
189 @Override
190 public boolean makeTopCandidate(String topic, NodeId nodeId) {
191 Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
192 v -> v != null &&
193 v.candidates().contains(nodeId) &&
194 !v.candidates().get(0).equals(nodeId),
195 (k, v) -> {
196 List<NodeId> newCandidates = new ArrayList<>();
197 newCandidates.add(nodeId);
198 newCandidates.addAll(v.candidates()
199 .stream()
200 .filter(id -> !nodeId.equals(id))
201 .collect(Collectors.toList()));
202 return new InternalLeadership(topic,
203 v.leader(),
204 v.term(),
205 System.currentTimeMillis(),
206 newCandidates);
207 });
208 return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
209 }
210
211 @Override
212 public Leadership getLeadership(String topic) {
213 return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
214 }
215
216 @Override
217 public Map<String, Leadership> getLeaderships() {
218 Map<String, Leadership> leaderships = Maps.newHashMap();
219 leadershipMap.entrySet().forEach(e -> {
220 leaderships.put(e.getKey(), e.getValue().value().asLeadership());
221 });
222 return ImmutableMap.copyOf(leaderships);
223 }
224
225 private static class InternalLeadership {
226 private final String topic;
227 private final NodeId leader;
228 private final long term;
229 private final long termStartTime;
230 private final List<NodeId> candidates;
231
232 public InternalLeadership(String topic,
233 NodeId leader,
234 long term,
235 long termStartTime,
236 List<NodeId> candidates) {
237 this.topic = topic;
238 this.leader = leader;
239 this.term = term;
240 this.termStartTime = termStartTime;
241 this.candidates = ImmutableList.copyOf(candidates);
242 }
243
244 public NodeId leader() {
245 return this.leader;
246 }
247
248 public long term() {
249 return term;
250 }
251
252 public long termStartTime() {
253 return termStartTime;
254 }
255
256 public List<NodeId> candidates() {
257 return candidates;
258 }
259
260 public Leadership asLeadership() {
261 return new Leadership(topic, leader == null ?
262 null : new Leader(leader, term, termStartTime), candidates);
263 }
264
265 public static Leadership toLeadership(InternalLeadership internalLeadership) {
266 return internalLeadership == null ? null : internalLeadership.asLeadership();
267 }
268
269 @Override
270 public String toString() {
271 return MoreObjects.toStringHelper(getClass())
272 .add("leader", leader)
273 .add("term", term)
274 .add("termStartTime", termStartTime)
275 .add("candidates", candidates)
276 .toString();
277 }
278 }
279}