blob: 004f807478eca6725d1a965701940a771175c7d2 [file] [log] [blame]
tom2d7c65f2014-09-23 01:09:35 -07001package org.onlab.onos.store.cluster.impl;
2
tomb41d1ac2014-09-24 01:51:24 -07003import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
tom2d7c65f2014-09-23 01:09:35 -07005import com.google.common.collect.ImmutableSet;
tomb41d1ac2014-09-24 01:51:24 -07006import com.hazelcast.core.IMap;
tom2d7c65f2014-09-23 01:09:35 -07007import com.hazelcast.core.Member;
tomb41d1ac2014-09-24 01:51:24 -07008import com.hazelcast.core.MemberAttributeEvent;
9import com.hazelcast.core.MembershipEvent;
10import com.hazelcast.core.MembershipListener;
tom2d7c65f2014-09-23 01:09:35 -070011import org.apache.felix.scr.annotations.Activate;
12import org.apache.felix.scr.annotations.Component;
13import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070014import org.apache.felix.scr.annotations.Service;
tom0755a362014-09-24 11:54:43 -070015import org.onlab.onos.cluster.ClusterEvent;
tom2d7c65f2014-09-23 01:09:35 -070016import org.onlab.onos.cluster.ClusterStore;
tom0755a362014-09-24 11:54:43 -070017import org.onlab.onos.cluster.ClusterStoreDelegate;
tom2d7c65f2014-09-23 01:09:35 -070018import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.DefaultControllerNode;
20import org.onlab.onos.cluster.NodeId;
tomb41d1ac2014-09-24 01:51:24 -070021import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
22import org.onlab.onos.store.impl.AbstractDistributedStore;
23import org.onlab.onos.store.impl.OptionalCacheLoader;
tom2d7c65f2014-09-23 01:09:35 -070024import org.onlab.packet.IpPrefix;
tom2d7c65f2014-09-23 01:09:35 -070025
tomb41d1ac2014-09-24 01:51:24 -070026import java.util.Map;
tom2d7c65f2014-09-23 01:09:35 -070027import java.util.Set;
tomb41d1ac2014-09-24 01:51:24 -070028import java.util.concurrent.ConcurrentHashMap;
tom2d7c65f2014-09-23 01:09:35 -070029
tomb41d1ac2014-09-24 01:51:24 -070030import static com.google.common.cache.CacheBuilder.newBuilder;
tom0755a362014-09-24 11:54:43 -070031import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
32import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
tomb41d1ac2014-09-24 01:51:24 -070033import static org.onlab.onos.cluster.ControllerNode.State;
tom2d7c65f2014-09-23 01:09:35 -070034
35/**
36 * Distributed implementation of the cluster nodes store.
37 */
38@Component(immediate = true)
39@Service
tom0755a362014-09-24 11:54:43 -070040public class DistributedClusterStore
41 extends AbstractDistributedStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070042 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070043
tomb41d1ac2014-09-24 01:51:24 -070044 private IMap<byte[], byte[]> rawNodes;
45 private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
tom2d7c65f2014-09-23 01:09:35 -070046
tomb41d1ac2014-09-24 01:51:24 -070047 private String listenerId;
tom0755a362014-09-24 11:54:43 -070048 private final MembershipListener listener = new InternalMembershipListener();
tomb41d1ac2014-09-24 01:51:24 -070049 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
tom2d7c65f2014-09-23 01:09:35 -070050
51 @Activate
52 public void activate() {
tomb41d1ac2014-09-24 01:51:24 -070053 super.activate();
54 listenerId = theInstance.getCluster().addMembershipListener(listener);
tom2d7c65f2014-09-23 01:09:35 -070055
tomb41d1ac2014-09-24 01:51:24 -070056 rawNodes = theInstance.getMap("nodes");
57 OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
58 = new OptionalCacheLoader<>(storeService, rawNodes);
59 nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
60 rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
61
62 loadClusterNodes();
63
64 log.info("Started");
65 }
66
67 // Loads the initial set of cluster nodes
68 private void loadClusterNodes() {
69 for (Member member : theInstance.getCluster().getMembers()) {
70 addMember(member);
71 }
tom2d7c65f2014-09-23 01:09:35 -070072 }
73
74 @Deactivate
75 public void deactivate() {
tomb41d1ac2014-09-24 01:51:24 -070076 theInstance.getCluster().removeMembershipListener(listenerId);
tom2d7c65f2014-09-23 01:09:35 -070077 log.info("Stopped");
78 }
79
80 @Override
81 public ControllerNode getLocalNode() {
82 return node(theInstance.getCluster().getLocalMember());
83 }
84
85 @Override
86 public Set<ControllerNode> getNodes() {
87 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
tomb41d1ac2014-09-24 01:51:24 -070088 for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
89 builder.add(optional.get());
tom2d7c65f2014-09-23 01:09:35 -070090 }
91 return builder.build();
92 }
93
94 @Override
95 public ControllerNode getNode(NodeId nodeId) {
tomb41d1ac2014-09-24 01:51:24 -070096 return nodes.getUnchecked(nodeId).orNull();
tom2d7c65f2014-09-23 01:09:35 -070097 }
98
99 @Override
tomb41d1ac2014-09-24 01:51:24 -0700100 public State getState(NodeId nodeId) {
101 State state = states.get(nodeId);
102 return state == null ? State.INACTIVE : state;
103 }
104
105 @Override
106 public void removeNode(NodeId nodeId) {
107 synchronized (this) {
108 rawNodes.remove(serialize(nodeId));
109 nodes.invalidate(nodeId);
110 }
111 }
112
113 // Adds a new node based on the specified member
tom0755a362014-09-24 11:54:43 -0700114 private synchronized ControllerNode addMember(Member member) {
tomb41d1ac2014-09-24 01:51:24 -0700115 DefaultControllerNode node = node(member);
116 rawNodes.put(serialize(node.id()), serialize(node));
117 nodes.put(node.id(), Optional.of(node));
118 states.put(node.id(), State.ACTIVE);
tom0755a362014-09-24 11:54:43 -0700119 return node;
tom2d7c65f2014-09-23 01:09:35 -0700120 }
121
122 // Creates a controller node descriptor from the Hazelcast member.
tomb41d1ac2014-09-24 01:51:24 -0700123 private DefaultControllerNode node(Member member) {
124 IpPrefix ip = memberAddress(member);
125 return new DefaultControllerNode(new NodeId(ip.toString()), ip);
126 }
127
128 private IpPrefix memberAddress(Member member) {
129 byte[] address = member.getSocketAddress().getAddress().getAddress();
130 return IpPrefix.valueOf(address);
131 }
132
133 // Interceptor for membership events.
tom0755a362014-09-24 11:54:43 -0700134 private class InternalMembershipListener implements MembershipListener {
tomb41d1ac2014-09-24 01:51:24 -0700135 @Override
136 public void memberAdded(MembershipEvent membershipEvent) {
137 log.info("Member {} added", membershipEvent.getMember());
tom0755a362014-09-24 11:54:43 -0700138 ControllerNode node = addMember(membershipEvent.getMember());
139 notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
tomb41d1ac2014-09-24 01:51:24 -0700140 }
141
142 @Override
143 public void memberRemoved(MembershipEvent membershipEvent) {
144 log.info("Member {} removed", membershipEvent.getMember());
tom0755a362014-09-24 11:54:43 -0700145 NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
146 states.put(nodeId, State.INACTIVE);
147 notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
tomb41d1ac2014-09-24 01:51:24 -0700148 }
149
150 @Override
151 public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
152 log.info("Member {} attribute {} changed to {}",
153 memberAttributeEvent.getMember(),
154 memberAttributeEvent.getKey(),
155 memberAttributeEvent.getValue());
156 }
tom2d7c65f2014-09-23 01:09:35 -0700157 }
158}