blob: fbf6b60aa3e6c463126a7386528191c1351a4d2e [file] [log] [blame]
Madan Jampani620f70d2016-01-30 22:22:47 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.ArrayList;
21import java.util.List;
22import java.util.Map;
23import java.util.stream.Collectors;
24
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.Leader;
33import org.onosproject.cluster.Leadership;
34import org.onosproject.cluster.LeadershipEvent;
35import org.onosproject.cluster.LeadershipStore;
36import org.onosproject.cluster.LeadershipStoreDelegate;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.ConsistentMap;
41import org.onosproject.store.service.MapEventListener;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.store.service.StorageService;
44import org.onosproject.store.service.Versioned;
45import org.slf4j.Logger;
46
47import com.google.common.base.MoreObjects;
48import com.google.common.base.Objects;
49import com.google.common.collect.ImmutableList;
50import com.google.common.collect.ImmutableMap;
51import com.google.common.collect.ImmutableSet;
52import com.google.common.collect.Maps;
53import com.google.common.collect.Sets;
54
55/**
56 * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
57 */
58@Service
Madan Jampani1ffa46b2016-04-01 14:34:01 -070059@Component(immediate = true, enabled = false)
Madan Jampani620f70d2016-01-30 22:22:47 -080060public class DistributedLeadershipStore
61 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
62 implements LeadershipStore {
63
64 private static final Logger log = getLogger(DistributedLeadershipStore.class);
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected ClusterService clusterService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected StorageService storageService;
71
72 protected NodeId localNodeId;
73 protected ConsistentMap<String, InternalLeadership> leadershipMap;
Madan Jampani72282af2016-02-23 14:23:52 -080074 protected Map<String, Versioned<InternalLeadership>> leadershipCache = Maps.newConcurrentMap();
75
Madan Jampani620f70d2016-01-30 22:22:47 -080076 private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
77 event -> {
78 Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
79 Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
80 boolean leaderChanged =
81 !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
82 boolean candidatesChanged =
83 !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
84 ImmutableSet.<NodeId>of() : oldValue.candidates()),
85 Sets.newHashSet(newValue.candidates())).isEmpty();
86 LeadershipEvent.Type eventType = null;
87 if (leaderChanged && candidatesChanged) {
88 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
89 }
90 if (leaderChanged && !candidatesChanged) {
91 eventType = LeadershipEvent.Type.LEADER_CHANGED;
92 }
93 if (!leaderChanged && candidatesChanged) {
94 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
95 }
Madan Jampani72282af2016-02-23 14:23:52 -080096 leadershipCache.compute(event.key(), (k, v) -> {
97 if (v == null || v.version() < event.newValue().version()) {
98 return event.newValue();
99 }
100 return v;
101 });
Madan Jampani620f70d2016-01-30 22:22:47 -0800102 notifyDelegate(new LeadershipEvent(eventType, newValue));
103 };
104
105 @Activate
106 public void activate() {
107 localNodeId = clusterService.getLocalNode().id();
108 leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
109 .withName("onos-leadership")
110 .withPartitionsDisabled()
111 .withRelaxedReadConsistency()
112 .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
113 .build();
Madan Jampani72282af2016-02-23 14:23:52 -0800114 leadershipMap.entrySet().forEach(e -> leadershipCache.put(e.getKey(), e.getValue()));
Madan Jampani620f70d2016-01-30 22:22:47 -0800115 leadershipMap.addListener(leadershipChangeListener);
116 log.info("Started");
117 }
118
119 @Deactivate
120 public void deactivate() {
121 leadershipMap.removeListener(leadershipChangeListener);
122 log.info("Stopped");
123 }
124
125 @Override
126 public Leadership addRegistration(String topic) {
127 Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
128 v -> v == null || !v.candidates().contains(localNodeId),
129 (k, v) -> {
130 if (v == null || v.candidates().isEmpty()) {
131 return new InternalLeadership(topic,
132 localNodeId,
133 v == null ? 1 : v.term() + 1,
134 System.currentTimeMillis(),
135 ImmutableList.of(localNodeId));
136 }
137 List<NodeId> newCandidates = new ArrayList<>(v.candidates());
138 newCandidates.add(localNodeId);
139 return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
140 });
141 return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
142 }
143
144 @Override
145 public void removeRegistration(String topic) {
146 removeRegistration(topic, localNodeId);
147 }
148
149 private void removeRegistration(String topic, NodeId nodeId) {
150 leadershipMap.computeIf(topic,
151 v -> v != null && v.candidates().contains(nodeId),
152 (k, v) -> {
153 List<NodeId> newCandidates = v.candidates()
154 .stream()
155 .filter(id -> !nodeId.equals(id))
156 .collect(Collectors.toList());
157 NodeId newLeader = nodeId.equals(v.leader()) ?
158 newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
159 long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
160 v.term() : v.term() + 1;
161 long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
162 v.termStartTime() : System.currentTimeMillis();
163 return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
164 });
165 }
166
167 @Override
168 public void removeRegistration(NodeId nodeId) {
169 leadershipMap.entrySet()
170 .stream()
171 .filter(e -> e.getValue().value().candidates().contains(nodeId))
172 .map(e -> e.getKey())
173 .forEach(topic -> this.removeRegistration(topic, nodeId));
174 }
175
176 @Override
177 public boolean moveLeadership(String topic, NodeId toNodeId) {
178 Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
179 v -> v != null &&
180 v.candidates().contains(toNodeId) &&
181 !Objects.equal(v.leader(), toNodeId),
182 (k, v) -> {
183 List<NodeId> newCandidates = new ArrayList<>();
184 newCandidates.add(toNodeId);
185 newCandidates.addAll(v.candidates()
186 .stream()
187 .filter(id -> !toNodeId.equals(id))
188 .collect(Collectors.toList()));
189 return new InternalLeadership(topic,
190 toNodeId,
191 v.term() + 1,
192 System.currentTimeMillis(),
193 newCandidates);
194 });
195 return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
196 }
197
198 @Override
199 public boolean makeTopCandidate(String topic, NodeId nodeId) {
200 Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
201 v -> v != null &&
202 v.candidates().contains(nodeId) &&
203 !v.candidates().get(0).equals(nodeId),
204 (k, v) -> {
205 List<NodeId> newCandidates = new ArrayList<>();
206 newCandidates.add(nodeId);
207 newCandidates.addAll(v.candidates()
208 .stream()
209 .filter(id -> !nodeId.equals(id))
210 .collect(Collectors.toList()));
211 return new InternalLeadership(topic,
212 v.leader(),
213 v.term(),
214 System.currentTimeMillis(),
215 newCandidates);
216 });
217 return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
218 }
219
220 @Override
221 public Leadership getLeadership(String topic) {
Madan Jampanif1d87ca2016-02-23 17:24:24 -0800222 InternalLeadership internalLeadership = Versioned.valueOrNull(leadershipMap.get(topic));
Madan Jampani72282af2016-02-23 14:23:52 -0800223 return internalLeadership == null ? null : internalLeadership.asLeadership();
Madan Jampani620f70d2016-01-30 22:22:47 -0800224 }
225
226 @Override
227 public Map<String, Leadership> getLeaderships() {
Madan Jampani72282af2016-02-23 14:23:52 -0800228 return ImmutableMap.copyOf(Maps.transformValues(leadershipCache, v -> v.value().asLeadership()));
Madan Jampani620f70d2016-01-30 22:22:47 -0800229 }
230
231 private static class InternalLeadership {
232 private final String topic;
233 private final NodeId leader;
234 private final long term;
235 private final long termStartTime;
236 private final List<NodeId> candidates;
237
238 public InternalLeadership(String topic,
239 NodeId leader,
240 long term,
241 long termStartTime,
242 List<NodeId> candidates) {
243 this.topic = topic;
244 this.leader = leader;
245 this.term = term;
246 this.termStartTime = termStartTime;
247 this.candidates = ImmutableList.copyOf(candidates);
248 }
249
250 public NodeId leader() {
251 return this.leader;
252 }
253
254 public long term() {
255 return term;
256 }
257
258 public long termStartTime() {
259 return termStartTime;
260 }
261
262 public List<NodeId> candidates() {
263 return candidates;
264 }
265
266 public Leadership asLeadership() {
267 return new Leadership(topic, leader == null ?
268 null : new Leader(leader, term, termStartTime), candidates);
269 }
270
271 public static Leadership toLeadership(InternalLeadership internalLeadership) {
272 return internalLeadership == null ? null : internalLeadership.asLeadership();
273 }
274
275 @Override
276 public String toString() {
277 return MoreObjects.toStringHelper(getClass())
278 .add("leader", leader)
279 .add("term", term)
280 .add("termStartTime", termStartTime)
281 .add("candidates", candidates)
282 .toString();
283 }
284 }
285}