blob: 8983cf551e28e66a2ecddc5bf906d1a4e110bf3b [file] [log] [blame]
tom2d7c65f2014-09-23 01:09:35 -07001package org.onlab.onos.store.cluster.impl;
2
tomb41d1ac2014-09-24 01:51:24 -07003import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
tom2d7c65f2014-09-23 01:09:35 -07005import com.google.common.collect.ImmutableSet;
tomb41d1ac2014-09-24 01:51:24 -07006import com.hazelcast.core.IMap;
tom2d7c65f2014-09-23 01:09:35 -07007import com.hazelcast.core.Member;
tomb41d1ac2014-09-24 01:51:24 -07008import com.hazelcast.core.MemberAttributeEvent;
9import com.hazelcast.core.MembershipEvent;
10import com.hazelcast.core.MembershipListener;
tom2d7c65f2014-09-23 01:09:35 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070014import org.apache.felix.scr.annotations.Service;
tom0755a362014-09-24 11:54:43 -070015import org.onlab.onos.cluster.ClusterEvent;
tom2d7c65f2014-09-23 01:09:35 -070016import org.onlab.onos.cluster.ClusterStore;
tom0755a362014-09-24 11:54:43 -070017import org.onlab.onos.cluster.ClusterStoreDelegate;
tom2d7c65f2014-09-23 01:09:35 -070018import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.DefaultControllerNode;
20import org.onlab.onos.cluster.NodeId;
tomb41d1ac2014-09-24 01:51:24 -070021import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
22import org.onlab.onos.store.impl.AbstractDistributedStore;
23import org.onlab.onos.store.impl.OptionalCacheLoader;
tom2d7c65f2014-09-23 01:09:35 -070024import org.onlab.packet.IpPrefix;
tom2d7c65f2014-09-23 01:09:35 -070025
tomb41d1ac2014-09-24 01:51:24 -070026import java.util.Map;
tom2d7c65f2014-09-23 01:09:35 -070027import java.util.Set;
tomb41d1ac2014-09-24 01:51:24 -070028import java.util.concurrent.ConcurrentHashMap;
tom2d7c65f2014-09-23 01:09:35 -070029
tomb41d1ac2014-09-24 01:51:24 -070030import static com.google.common.cache.CacheBuilder.newBuilder;
tom0755a362014-09-24 11:54:43 -070031import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
32import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
tomb41d1ac2014-09-24 01:51:24 -070033import static org.onlab.onos.cluster.ControllerNode.State;
tom2d7c65f2014-09-23 01:09:35 -070034
35/**
36 * Distributed implementation of the cluster nodes store.
37 */
38@Component(immediate = true)
39@Service
tom0755a362014-09-24 11:54:43 -070040public class DistributedClusterStore
41 extends AbstractDistributedStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070042 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070043
tomb41d1ac2014-09-24 01:51:24 -070044 private IMap<byte[], byte[]> rawNodes;
45 private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
tom2d7c65f2014-09-23 01:09:35 -070046
tomb41d1ac2014-09-24 01:51:24 -070047 private String listenerId;
tom0755a362014-09-24 11:54:43 -070048 private final MembershipListener listener = new InternalMembershipListener();
tomb41d1ac2014-09-24 01:51:24 -070049 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
tom2d7c65f2014-09-23 01:09:35 -070050
51 @Activate
52 public void activate() {
tomb41d1ac2014-09-24 01:51:24 -070053 super.activate();
54 listenerId = theInstance.getCluster().addMembershipListener(listener);
tom2d7c65f2014-09-23 01:09:35 -070055
tomb41d1ac2014-09-24 01:51:24 -070056 rawNodes = theInstance.getMap("nodes");
57 OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
58 = new OptionalCacheLoader<>(storeService, rawNodes);
59 nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
60 rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
61
62 loadClusterNodes();
63
64 log.info("Started");
65 }
66
67 // Loads the initial set of cluster nodes
68 private void loadClusterNodes() {
69 for (Member member : theInstance.getCluster().getMembers()) {
tomee49c372014-09-26 15:14:50 -070070 addNode(node(member));
tomb41d1ac2014-09-24 01:51:24 -070071 }
tom2d7c65f2014-09-23 01:09:35 -070072 }
73
74 @Deactivate
75 public void deactivate() {
tomb41d1ac2014-09-24 01:51:24 -070076 theInstance.getCluster().removeMembershipListener(listenerId);
tom2d7c65f2014-09-23 01:09:35 -070077 log.info("Stopped");
78 }
79
80 @Override
81 public ControllerNode getLocalNode() {
82 return node(theInstance.getCluster().getLocalMember());
83 }
84
85 @Override
86 public Set<ControllerNode> getNodes() {
87 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
tomb41d1ac2014-09-24 01:51:24 -070088 for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
89 builder.add(optional.get());
tom2d7c65f2014-09-23 01:09:35 -070090 }
91 return builder.build();
92 }
93
94 @Override
95 public ControllerNode getNode(NodeId nodeId) {
tomb41d1ac2014-09-24 01:51:24 -070096 return nodes.getUnchecked(nodeId).orNull();
tom2d7c65f2014-09-23 01:09:35 -070097 }
98
99 @Override
tomb41d1ac2014-09-24 01:51:24 -0700100 public State getState(NodeId nodeId) {
101 State state = states.get(nodeId);
102 return state == null ? State.INACTIVE : state;
103 }
104
105 @Override
tomee49c372014-09-26 15:14:50 -0700106 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
107 return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
108 }
109
110 @Override
tomb41d1ac2014-09-24 01:51:24 -0700111 public void removeNode(NodeId nodeId) {
112 synchronized (this) {
113 rawNodes.remove(serialize(nodeId));
114 nodes.invalidate(nodeId);
115 }
116 }
117
118 // Adds a new node based on the specified member
tomee49c372014-09-26 15:14:50 -0700119 private synchronized ControllerNode addNode(DefaultControllerNode node) {
tomb41d1ac2014-09-24 01:51:24 -0700120 rawNodes.put(serialize(node.id()), serialize(node));
121 nodes.put(node.id(), Optional.of(node));
122 states.put(node.id(), State.ACTIVE);
tom0755a362014-09-24 11:54:43 -0700123 return node;
tom2d7c65f2014-09-23 01:09:35 -0700124 }
125
126 // Creates a controller node descriptor from the Hazelcast member.
tomb41d1ac2014-09-24 01:51:24 -0700127 private DefaultControllerNode node(Member member) {
128 IpPrefix ip = memberAddress(member);
129 return new DefaultControllerNode(new NodeId(ip.toString()), ip);
130 }
131
132 private IpPrefix memberAddress(Member member) {
133 byte[] address = member.getSocketAddress().getAddress().getAddress();
134 return IpPrefix.valueOf(address);
135 }
136
137 // Interceptor for membership events.
tom0755a362014-09-24 11:54:43 -0700138 private class InternalMembershipListener implements MembershipListener {
tomb41d1ac2014-09-24 01:51:24 -0700139 @Override
140 public void memberAdded(MembershipEvent membershipEvent) {
141 log.info("Member {} added", membershipEvent.getMember());
tomee49c372014-09-26 15:14:50 -0700142 ControllerNode node = addNode(node(membershipEvent.getMember()));
tom0755a362014-09-24 11:54:43 -0700143 notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
tomb41d1ac2014-09-24 01:51:24 -0700144 }
145
146 @Override
147 public void memberRemoved(MembershipEvent membershipEvent) {
148 log.info("Member {} removed", membershipEvent.getMember());
tom0755a362014-09-24 11:54:43 -0700149 NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
150 states.put(nodeId, State.INACTIVE);
151 notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
tomb41d1ac2014-09-24 01:51:24 -0700152 }
153
154 @Override
155 public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
156 log.info("Member {} attribute {} changed to {}",
157 memberAttributeEvent.getMember(),
158 memberAttributeEvent.getKey(),
159 memberAttributeEvent.getValue());
160 }
tom2d7c65f2014-09-23 01:09:35 -0700161 }
162}