blob: c15d6aa6f84d2006d1b395d0208e7a732645d0cf [file] [log] [blame]
tomb41d1ac2014-09-24 01:51:24 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
5import com.google.common.collect.ImmutableSet;
6import com.hazelcast.core.IMap;
7import org.apache.felix.scr.annotations.Activate;
8import org.apache.felix.scr.annotations.Component;
9import org.apache.felix.scr.annotations.Deactivate;
10import org.apache.felix.scr.annotations.Reference;
11import org.apache.felix.scr.annotations.ReferenceCardinality;
12import org.apache.felix.scr.annotations.Service;
13import org.onlab.onos.cluster.ClusterService;
14import org.onlab.onos.cluster.MastershipEvent;
15import org.onlab.onos.cluster.MastershipStore;
16import org.onlab.onos.cluster.NodeId;
17import org.onlab.onos.net.DeviceId;
18import org.onlab.onos.net.MastershipRole;
19import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
20import org.onlab.onos.store.impl.AbstractDistributedStore;
21import org.onlab.onos.store.impl.OptionalCacheLoader;
22
23import java.util.Map;
24import java.util.Objects;
25import java.util.Set;
26
27import static com.google.common.cache.CacheBuilder.newBuilder;
28
29/**
30 * Distributed implementation of the cluster nodes store.
31 */
32@Component(immediate = true)
33@Service
34public class DistributedMastershipStore extends AbstractDistributedStore
35 implements MastershipStore {
36
37 private IMap<byte[], byte[]> rawMasters;
38 private LoadingCache<DeviceId, Optional<NodeId>> masters;
39
40 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
41 protected ClusterService clusterService;
42
Ayaka Koshibe406d0102014-09-24 16:08:12 -070043 @Override
tomb41d1ac2014-09-24 01:51:24 -070044 @Activate
45 public void activate() {
46 super.activate();
47
48 rawMasters = theInstance.getMap("masters");
49 OptionalCacheLoader<DeviceId, NodeId> nodeLoader
50 = new OptionalCacheLoader<>(storeService, rawMasters);
51 masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
52 rawMasters.addEntryListener(new RemoteEventHandler<>(masters), true);
53
54 log.info("Started");
55 }
56
57 @Deactivate
58 public void deactivate() {
59 log.info("Stopped");
60 }
61
62 @Override
Ayaka Koshibe406d0102014-09-24 16:08:12 -070063 public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
tomb41d1ac2014-09-24 01:51:24 -070064 synchronized (this) {
65 NodeId currentMaster = getMaster(deviceId);
Ayaka Koshibe406d0102014-09-24 16:08:12 -070066 if (Objects.equals(currentMaster, nodeId)) {
tomb41d1ac2014-09-24 01:51:24 -070067 return null;
68 }
69
70 // FIXME: for now implementing semantics of setMaster
71 rawMasters.put(serialize(deviceId), serialize(nodeId));
72 masters.put(deviceId, Optional.of(nodeId));
73 return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
74 }
75 }
76
77 @Override
78 public NodeId getMaster(DeviceId deviceId) {
79 return masters.getUnchecked(deviceId).orNull();
80 }
81
82 @Override
83 public Set<DeviceId> getDevices(NodeId nodeId) {
84 ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
85 for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
86 if (nodeId.equals(entry.getValue().get())) {
87 builder.add(entry.getKey());
88 }
89 }
90 return builder.build();
91 }
92
93 @Override
94 public MastershipRole requestRole(DeviceId deviceId) {
95 // FIXME: for now we are 'selecting' as master whoever asks
Ayaka Koshibe406d0102014-09-24 16:08:12 -070096 setMaster(clusterService.getLocalNode().id(), deviceId);
tomb41d1ac2014-09-24 01:51:24 -070097 return MastershipRole.MASTER;
98 }
99
100 @Override
101 public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
102 NodeId master = masters.getUnchecked(deviceId).orNull();
103 return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
104 }
105
106}