blob: 77a28f572a91ed34649558a896f206575f19f7a2 [file] [log] [blame]
tom2d7c65f2014-09-23 01:09:35 -07001package org.onlab.onos.store.cluster.impl;
2
tomb41d1ac2014-09-24 01:51:24 -07003import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
tom2d7c65f2014-09-23 01:09:35 -07005import com.google.common.collect.ImmutableSet;
tomb41d1ac2014-09-24 01:51:24 -07006import com.hazelcast.core.IMap;
tom2d7c65f2014-09-23 01:09:35 -07007import com.hazelcast.core.Member;
tomb41d1ac2014-09-24 01:51:24 -07008import com.hazelcast.core.MemberAttributeEvent;
9import com.hazelcast.core.MembershipEvent;
10import com.hazelcast.core.MembershipListener;
tom2d7c65f2014-09-23 01:09:35 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070014import org.apache.felix.scr.annotations.Service;
15import org.onlab.onos.cluster.ClusterStore;
16import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.DefaultControllerNode;
18import org.onlab.onos.cluster.NodeId;
tomb41d1ac2014-09-24 01:51:24 -070019import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
20import org.onlab.onos.store.impl.AbstractDistributedStore;
21import org.onlab.onos.store.impl.OptionalCacheLoader;
tom2d7c65f2014-09-23 01:09:35 -070022import org.onlab.packet.IpPrefix;
tom2d7c65f2014-09-23 01:09:35 -070023
tomb41d1ac2014-09-24 01:51:24 -070024import java.util.Map;
tom2d7c65f2014-09-23 01:09:35 -070025import java.util.Set;
tomb41d1ac2014-09-24 01:51:24 -070026import java.util.concurrent.ConcurrentHashMap;
tom2d7c65f2014-09-23 01:09:35 -070027
tomb41d1ac2014-09-24 01:51:24 -070028import static com.google.common.cache.CacheBuilder.newBuilder;
29import static org.onlab.onos.cluster.ControllerNode.State;
tom2d7c65f2014-09-23 01:09:35 -070030
31/**
32 * Distributed implementation of the cluster nodes store.
33 */
34@Component(immediate = true)
35@Service
tomb41d1ac2014-09-24 01:51:24 -070036public class DistributedClusterStore extends AbstractDistributedStore
37 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070038
tomb41d1ac2014-09-24 01:51:24 -070039 private IMap<byte[], byte[]> rawNodes;
40 private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
tom2d7c65f2014-09-23 01:09:35 -070041
tomb41d1ac2014-09-24 01:51:24 -070042 private String listenerId;
43 private final MembershipListener listener = new InnerMembershipListener();
44 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
tom2d7c65f2014-09-23 01:09:35 -070045
46 @Activate
47 public void activate() {
tomb41d1ac2014-09-24 01:51:24 -070048 super.activate();
49 listenerId = theInstance.getCluster().addMembershipListener(listener);
tom2d7c65f2014-09-23 01:09:35 -070050
tomb41d1ac2014-09-24 01:51:24 -070051 rawNodes = theInstance.getMap("nodes");
52 OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
53 = new OptionalCacheLoader<>(storeService, rawNodes);
54 nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
55 rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
56
57 loadClusterNodes();
58
59 log.info("Started");
60 }
61
62 // Loads the initial set of cluster nodes
63 private void loadClusterNodes() {
64 for (Member member : theInstance.getCluster().getMembers()) {
65 addMember(member);
66 }
tom2d7c65f2014-09-23 01:09:35 -070067 }
68
69 @Deactivate
70 public void deactivate() {
tomb41d1ac2014-09-24 01:51:24 -070071 theInstance.getCluster().removeMembershipListener(listenerId);
tom2d7c65f2014-09-23 01:09:35 -070072 log.info("Stopped");
73 }
74
75 @Override
76 public ControllerNode getLocalNode() {
77 return node(theInstance.getCluster().getLocalMember());
78 }
79
80 @Override
81 public Set<ControllerNode> getNodes() {
82 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
tomb41d1ac2014-09-24 01:51:24 -070083 for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
84 builder.add(optional.get());
tom2d7c65f2014-09-23 01:09:35 -070085 }
86 return builder.build();
87 }
88
89 @Override
90 public ControllerNode getNode(NodeId nodeId) {
tomb41d1ac2014-09-24 01:51:24 -070091 return nodes.getUnchecked(nodeId).orNull();
tom2d7c65f2014-09-23 01:09:35 -070092 }
93
94 @Override
tomb41d1ac2014-09-24 01:51:24 -070095 public State getState(NodeId nodeId) {
96 State state = states.get(nodeId);
97 return state == null ? State.INACTIVE : state;
98 }
99
100 @Override
101 public void removeNode(NodeId nodeId) {
102 synchronized (this) {
103 rawNodes.remove(serialize(nodeId));
104 nodes.invalidate(nodeId);
105 }
106 }
107
108 // Adds a new node based on the specified member
109 private synchronized void addMember(Member member) {
110 DefaultControllerNode node = node(member);
111 rawNodes.put(serialize(node.id()), serialize(node));
112 nodes.put(node.id(), Optional.of(node));
113 states.put(node.id(), State.ACTIVE);
tom2d7c65f2014-09-23 01:09:35 -0700114 }
115
116 // Creates a controller node descriptor from the Hazelcast member.
tomb41d1ac2014-09-24 01:51:24 -0700117 private DefaultControllerNode node(Member member) {
118 IpPrefix ip = memberAddress(member);
119 return new DefaultControllerNode(new NodeId(ip.toString()), ip);
120 }
121
122 private IpPrefix memberAddress(Member member) {
123 byte[] address = member.getSocketAddress().getAddress().getAddress();
124 return IpPrefix.valueOf(address);
125 }
126
127 // Interceptor for membership events.
128 private class InnerMembershipListener implements MembershipListener {
129 @Override
130 public void memberAdded(MembershipEvent membershipEvent) {
131 log.info("Member {} added", membershipEvent.getMember());
132 addMember(membershipEvent.getMember());
133 }
134
135 @Override
136 public void memberRemoved(MembershipEvent membershipEvent) {
137 log.info("Member {} removed", membershipEvent.getMember());
138 states.put(new NodeId(memberAddress(membershipEvent.getMember()).toString()),
139 State.INACTIVE);
140 }
141
142 @Override
143 public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
144 log.info("Member {} attribute {} changed to {}",
145 memberAttributeEvent.getMember(),
146 memberAttributeEvent.getKey(),
147 memberAttributeEvent.getValue());
148 }
tom2d7c65f2014-09-23 01:09:35 -0700149 }
150}