blob: ad1d0a26ea89f7b07bfe312ec981eeb184154258 [file] [log] [blame]
Madan Jampani7e55c662016-02-15 21:13:53 -08001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.Map;
21import java.util.Objects;
22import java.util.function.Consumer;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Reference;
28import org.apache.felix.scr.annotations.ReferenceCardinality;
29import org.apache.felix.scr.annotations.Service;
30import org.onlab.util.Tools;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.Leadership;
33import org.onosproject.cluster.LeadershipEvent;
34import org.onosproject.cluster.LeadershipStore;
35import org.onosproject.cluster.LeadershipStoreDelegate;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.event.Change;
38import org.onosproject.store.AbstractStore;
39import org.onosproject.store.service.LeaderElector;
40import org.onosproject.store.service.StorageException;
41import org.onosproject.store.service.StorageService;
42import org.slf4j.Logger;
43
44import com.google.common.collect.ImmutableMap;
45import com.google.common.collect.Maps;
46
47/**
48 * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
49 * primitive.
50 */
51@Service
52@Component(immediate = true, enabled = false)
53public class NewDistributedLeadershipStore
54 extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
55 implements LeadershipStore {
56
57 private final Logger log = getLogger(getClass());
58
59 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 protected ClusterService clusterService;
61
62 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected StorageService storageService;
64
65 private NodeId localNodeId;
66 private LeaderElector leaderElector;
67 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
68
69 private static final int MAX_RETRIES = 10;
70 private static final int MAX_DELAY_MILLIS_BETWEEN_RETRIES = 100;
71
72 private final Consumer<Change<Leadership>> leadershipChangeListener =
73 change -> {
74 Leadership oldValue = change.oldValue();
75 Leadership newValue = change.newValue();
76 leaderBoard.put(newValue.topic(), newValue);
77 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
78 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
79 LeadershipEvent.Type eventType = null;
80 if (leaderChanged && candidatesChanged) {
81 eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
82 }
83 if (leaderChanged && !candidatesChanged) {
84 eventType = LeadershipEvent.Type.LEADER_CHANGED;
85 }
86 if (!leaderChanged && candidatesChanged) {
87 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
88 }
89 notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
90 };
91
92 @Activate
93 public void activate() {
94 localNodeId = clusterService.getLocalNode().id();
95 leaderElector = storageService.leaderElectorBuilder()
96 .withName("onos-leadership-elections")
97 .build()
98 .asLeaderElector();
99 leaderElector.addChangeListener(leadershipChangeListener);
100 leaderBoard.putAll(getLeaderships());
101 log.info("Started");
102 }
103
104 @Deactivate
105 public void deactivate() {
106 leaderElector.removeChangeListener(leadershipChangeListener);
107 log.info("Stopped");
108 }
109
110 @Override
111 public Leadership addRegistration(String topic) {
112 return Tools.retryable(() -> leaderElector.run(topic, localNodeId),
113 StorageException.class,
114 MAX_RETRIES,
115 MAX_DELAY_MILLIS_BETWEEN_RETRIES).get();
116 }
117
118 @Override
119 public void removeRegistration(String topic) {
120 Tools.retryable(() -> {
121 leaderElector.withdraw(topic);
122 return null;
123 },
124 StorageException.class,
125 MAX_RETRIES,
126 MAX_DELAY_MILLIS_BETWEEN_RETRIES).get();
127 }
128
129 @Override
130 public void removeRegistration(NodeId nodeId) {
131 // TODO
132 throw new UnsupportedOperationException();
133 }
134
135 @Override
136 public boolean moveLeadership(String topic, NodeId toNodeId) {
137 return leaderElector.anoint(topic, toNodeId);
138 }
139
140 @Override
141 public boolean makeTopCandidate(String topic, NodeId nodeId) {
142 // TODO
143 throw new UnsupportedOperationException();
144 }
145
146 @Override
147 public Leadership getLeadership(String topic) {
148 return leaderBoard.get(topic);
149 }
150
151 @Override
152 public Map<String, Leadership> getLeaderships() {
153 return ImmutableMap.copyOf(leaderBoard);
154 }
155}