blob: 71d42fa442673003f24879d4858ad5ebdd18058a [file] [log] [blame]
tomb41d1ac2014-09-24 01:51:24 -07001package org.onlab.onos.store.cluster.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
4
5import java.util.Map;
alshabib339a3d92014-09-26 17:54:32 -07006import java.util.Set;
Yuta HIGUCHIc8e19d42014-09-24 17:20:52 -07007
tomb41d1ac2014-09-24 01:51:24 -07008import org.apache.felix.scr.annotations.Activate;
9import org.apache.felix.scr.annotations.Component;
10import org.apache.felix.scr.annotations.Deactivate;
11import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
13import org.apache.felix.scr.annotations.Service;
14import org.onlab.onos.cluster.ClusterService;
15import org.onlab.onos.cluster.MastershipEvent;
16import org.onlab.onos.cluster.MastershipStore;
tom0755a362014-09-24 11:54:43 -070017import org.onlab.onos.cluster.MastershipStoreDelegate;
Ayaka Koshibeb70d34b2014-09-25 15:43:01 -070018import org.onlab.onos.cluster.MastershipTerm;
tomb41d1ac2014-09-24 01:51:24 -070019import org.onlab.onos.cluster.NodeId;
20import org.onlab.onos.net.DeviceId;
21import org.onlab.onos.net.MastershipRole;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070022import org.onlab.onos.store.common.AbstractHazelcastStore;
tomb41d1ac2014-09-24 01:51:24 -070023
alshabib339a3d92014-09-26 17:54:32 -070024import com.google.common.collect.ImmutableSet;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -070025import com.hazelcast.core.ILock;
alshabib339a3d92014-09-26 17:54:32 -070026import com.hazelcast.core.IMap;
Ayaka Koshibec4047702014-10-07 14:43:52 -070027import com.hazelcast.core.MultiMap;
tomb41d1ac2014-09-24 01:51:24 -070028
29/**
Ayaka Koshibec4047702014-10-07 14:43:52 -070030 * Distributed implementation of the mastership store. The store is
31 * responsible for the master selection process.
tomb41d1ac2014-09-24 01:51:24 -070032 */
33@Component(immediate = true)
34@Service
tom0755a362014-09-24 11:54:43 -070035public class DistributedMastershipStore
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070036extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
alshabib339a3d92014-09-26 17:54:32 -070037implements MastershipStore {
tomb41d1ac2014-09-24 01:51:24 -070038
Ayaka Koshibe8583ff32014-10-02 16:25:30 -070039 //arbitrary lock name
40 private static final String LOCK = "lock";
Ayaka Koshibec4047702014-10-07 14:43:52 -070041 //initial term/TTL value
Ayaka Koshibe8583ff32014-10-02 16:25:30 -070042 private static final Integer INIT = 0;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -070043
44 //devices to masters
Ayaka Koshibec4047702014-10-07 14:43:52 -070045 protected IMap<byte[], byte[]> masters;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -070046 //devices to terms
Ayaka Koshibec4047702014-10-07 14:43:52 -070047 protected IMap<byte[], Integer> terms;
48
49 //re-election related, disjoint-set structures:
50 //device-nodes multiset of available nodes
51 protected MultiMap<byte[], byte[]> standbys;
52 //device-nodes multiset for nodes that have given up on device
53 protected MultiMap<byte[], byte[]> unusable;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -070054
tomb41d1ac2014-09-24 01:51:24 -070055 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 protected ClusterService clusterService;
57
Ayaka Koshibe406d0102014-09-24 16:08:12 -070058 @Override
tomb41d1ac2014-09-24 01:51:24 -070059 @Activate
60 public void activate() {
61 super.activate();
62
Ayaka Koshibec4047702014-10-07 14:43:52 -070063 masters = theInstance.getMap("masters");
64 terms = theInstance.getMap("terms");
65 standbys = theInstance.getMultiMap("backups");
66 unusable = theInstance.getMultiMap("unusable");
tomb41d1ac2014-09-24 01:51:24 -070067
Ayaka Koshibec4047702014-10-07 14:43:52 -070068 masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
Yuta HIGUCHIc8e19d42014-09-24 17:20:52 -070069
tomb41d1ac2014-09-24 01:51:24 -070070 log.info("Started");
71 }
72
73 @Deactivate
74 public void deactivate() {
75 log.info("Stopped");
76 }
77
78 @Override
Ayaka Koshibec4047702014-10-07 14:43:52 -070079 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
80 byte[] did = serialize(deviceId);
81 byte[] nid = serialize(nodeId);
82
83 NodeId current = deserialize(masters.get(did));
84 if (current == null) {
85 if (standbys.containsEntry(did, nid)) {
86 //was previously standby, or set to standby from master
87 return MastershipRole.STANDBY;
88 } else {
89 return MastershipRole.NONE;
90 }
91 } else {
92 if (current.equals(nodeId)) {
93 //*should* be in unusable, not always
94 return MastershipRole.MASTER;
95 } else {
96 //may be in backups or unusable from earlier retirement
97 return MastershipRole.STANDBY;
98 }
99 }
100 }
101
102 @Override
Ayaka Koshibe406d0102014-09-24 16:08:12 -0700103 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700104 byte [] did = serialize(deviceId);
105 byte [] nid = serialize(nodeId);
tomb41d1ac2014-09-24 01:51:24 -0700106
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700107 ILock lock = theInstance.getLock(LOCK);
108 lock.lock();
109 try {
110 MastershipRole role = getRole(nodeId, deviceId);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700111 switch (role) {
112 case MASTER:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700113 //reinforce mastership
114 evict(nid, did);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700115 return null;
116 case STANDBY:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700117 //make current master standby
118 byte [] current = masters.get(did);
119 if (current != null) {
120 backup(current, did);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700121 }
Ayaka Koshibec4047702014-10-07 14:43:52 -0700122 //assign specified node as new master
123 masters.put(did, nid);
124 evict(nid, did);
125 updateTerm(did);
126 return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
127 case NONE:
128 masters.put(did, nid);
129 evict(nid, did);
130 updateTerm(did);
131 return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700132 default:
133 log.warn("unknown Mastership Role {}", role);
134 return null;
135 }
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700136 } finally {
137 lock.unlock();
tomb41d1ac2014-09-24 01:51:24 -0700138 }
139 }
140
141 @Override
142 public NodeId getMaster(DeviceId deviceId) {
Ayaka Koshibec4047702014-10-07 14:43:52 -0700143 return deserialize(masters.get(serialize(deviceId)));
tomb41d1ac2014-09-24 01:51:24 -0700144 }
145
146 @Override
147 public Set<DeviceId> getDevices(NodeId nodeId) {
148 ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700149
Ayaka Koshibec4047702014-10-07 14:43:52 -0700150 for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700151 if (nodeId.equals(deserialize(entry.getValue()))) {
152 builder.add((DeviceId) deserialize(entry.getKey()));
tomb41d1ac2014-09-24 01:51:24 -0700153 }
154 }
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700155
tomb41d1ac2014-09-24 01:51:24 -0700156 return builder.build();
157 }
158
159 @Override
160 public MastershipRole requestRole(DeviceId deviceId) {
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700161 NodeId local = clusterService.getLocalNode().id();
Ayaka Koshibec4047702014-10-07 14:43:52 -0700162 byte [] did = serialize(deviceId);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700163 byte [] lnid = serialize(local);
164
165 ILock lock = theInstance.getLock(LOCK);
166 lock.lock();
167 try {
168 MastershipRole role = getRole(local, deviceId);
169 switch (role) {
170 case MASTER:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700171 evict(lnid, did);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700172 break;
173 case STANDBY:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700174 backup(lnid, did);
175 terms.putIfAbsent(did, INIT);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700176 break;
177 case NONE:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700178 //claim mastership
179 masters.put(did, lnid);
180 evict(lnid, did);
181 updateTerm(did);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700182 role = MastershipRole.MASTER;
183 break;
184 default:
185 log.warn("unknown Mastership Role {}", role);
186 }
187 return role;
188 } finally {
189 lock.unlock();
190 }
tomb41d1ac2014-09-24 01:51:24 -0700191 }
192
193 @Override
Ayaka Koshibeb70d34b2014-09-25 15:43:01 -0700194 public MastershipTerm getTermFor(DeviceId deviceId) {
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700195 byte[] did = serialize(deviceId);
Ayaka Koshibec4047702014-10-07 14:43:52 -0700196 if ((masters.get(did) == null) ||
197 (terms.get(did) == null)) {
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700198 return null;
199 }
200 return MastershipTerm.of(
Ayaka Koshibec4047702014-10-07 14:43:52 -0700201 (NodeId) deserialize(masters.get(did)), terms.get(did));
Ayaka Koshibeb70d34b2014-09-25 15:43:01 -0700202 }
203
Ayaka Koshibed9f693e2014-09-29 18:04:54 -0700204 @Override
Ayaka Koshibec4047702014-10-07 14:43:52 -0700205 public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700206 byte [] did = serialize(deviceId);
Ayaka Koshibec4047702014-10-07 14:43:52 -0700207 byte [] nid = serialize(nodeId);
208 MastershipEvent event = null;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700209
210 ILock lock = theInstance.getLock(LOCK);
211 lock.lock();
212 try {
213 MastershipRole role = getRole(nodeId, deviceId);
214 switch (role) {
215 case MASTER:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700216 event = reelect(nodeId, deviceId);
217 backup(nid, did);
218 break;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700219 case STANDBY:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700220 //fall through to reinforce role
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700221 case NONE:
Ayaka Koshibec4047702014-10-07 14:43:52 -0700222 backup(nid, did);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700223 break;
224 default:
225 log.warn("unknown Mastership Role {}", role);
226 }
Ayaka Koshibec4047702014-10-07 14:43:52 -0700227 return event;
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700228 } finally {
229 lock.unlock();
230 }
231 }
232
Ayaka Koshibec4047702014-10-07 14:43:52 -0700233 @Override
234 public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
235 byte [] did = serialize(deviceId);
236 byte [] nid = serialize(nodeId);
237 MastershipEvent event = null;
Ayaka Koshibe25fd23a2014-10-03 15:50:43 -0700238
Ayaka Koshibec4047702014-10-07 14:43:52 -0700239 ILock lock = theInstance.getLock(LOCK);
240 lock.lock();
241 try {
242 MastershipRole role = getRole(nodeId, deviceId);
243 switch (role) {
244 case MASTER:
245 event = reelect(nodeId, deviceId);
246 evict(nid, did);
247 break;
248 case STANDBY:
249 //fall through to reinforce relinquishment
250 case NONE:
251 evict(nid, did);
252 break;
253 default:
254 log.warn("unknown Mastership Role {}", role);
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700255 }
Ayaka Koshibec4047702014-10-07 14:43:52 -0700256 return event;
257 } finally {
258 lock.unlock();
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700259 }
Ayaka Koshibed9f693e2014-09-29 18:04:54 -0700260 }
261
Ayaka Koshibec4047702014-10-07 14:43:52 -0700262 //helper to fetch a new master candidate for a given device.
263 private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
264 byte [] did = serialize(deviceId);
265 byte [] nid = serialize(current);
266
267 //if this is an queue it'd be neater.
268 byte [] backup = null;
269 for (byte [] n : standbys.get(serialize(deviceId))) {
270 if (!current.equals(deserialize(n))) {
271 backup = n;
272 break;
273 }
274 }
275
276 if (backup == null) {
277 masters.remove(did, nid);
278 return null;
279 } else {
280 masters.put(did, backup);
281 evict(backup, did);
282 Integer term = terms.get(did);
283 terms.put(did, ++term);
284 return new MastershipEvent(
285 MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
286 }
287 }
288
289 //adds node to pool(s) of backups and moves them from unusable.
290 private void backup(byte [] nodeId, byte [] deviceId) {
291 if (!standbys.containsEntry(deviceId, nodeId)) {
292 standbys.put(deviceId, nodeId);
293 }
294 if (unusable.containsEntry(deviceId, nodeId)) {
295 unusable.remove(deviceId, nodeId);
296 }
297 }
298
299 //adds node to unusable and evicts it from backup pool.
300 private void evict(byte [] nodeId, byte [] deviceId) {
301 if (!unusable.containsEntry(deviceId, nodeId)) {
302 unusable.put(deviceId, nodeId);
303 }
304 if (standbys.containsEntry(deviceId, nodeId)) {
305 standbys.remove(deviceId, nodeId);
306 }
307 }
308
309 //adds or updates term information.
310 private void updateTerm(byte [] deviceId) {
311 Integer term = terms.get(deviceId);
312 if (term == null) {
313 terms.put(deviceId, INIT);
314 } else {
315 terms.put(deviceId, ++term);
316 }
Ayaka Koshibe8583ff32014-10-02 16:25:30 -0700317 }
318
Ayaka Koshibe5c0f2372014-10-02 17:59:04 -0700319 private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
alshabib339a3d92014-09-26 17:54:32 -0700320
321 @Override
322 protected void onAdd(DeviceId deviceId, NodeId nodeId) {
323 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
324 }
325
326 @Override
327 protected void onRemove(DeviceId deviceId, NodeId nodeId) {
Ayaka Koshibe5c0f2372014-10-02 17:59:04 -0700328 //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
alshabib339a3d92014-09-26 17:54:32 -0700329 }
330
331 @Override
332 protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
Ayaka Koshibec4047702014-10-07 14:43:52 -0700333 //only addition indicates a change in mastership
Ayaka Koshibe5c0f2372014-10-02 17:59:04 -0700334 //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
alshabib339a3d92014-09-26 17:54:32 -0700335 }
336 }
337
tomb41d1ac2014-09-24 01:51:24 -0700338}