blob: 57d2358817f295d9ad7d57d5f09684b6c65e7ac6 [file] [log] [blame]
tom2d7c65f2014-09-23 01:09:35 -07001package org.onlab.onos.store.cluster.impl;
2
tomb41d1ac2014-09-24 01:51:24 -07003import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
tom2d7c65f2014-09-23 01:09:35 -07005import com.google.common.collect.ImmutableSet;
tomb41d1ac2014-09-24 01:51:24 -07006import com.hazelcast.core.IMap;
tom2d7c65f2014-09-23 01:09:35 -07007import com.hazelcast.core.Member;
tomb41d1ac2014-09-24 01:51:24 -07008import com.hazelcast.core.MemberAttributeEvent;
9import com.hazelcast.core.MembershipEvent;
10import com.hazelcast.core.MembershipListener;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070011
tom2d7c65f2014-09-23 01:09:35 -070012import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070015import org.apache.felix.scr.annotations.Service;
tom0755a362014-09-24 11:54:43 -070016import org.onlab.onos.cluster.ClusterEvent;
tom2d7c65f2014-09-23 01:09:35 -070017import org.onlab.onos.cluster.ClusterStore;
tom0755a362014-09-24 11:54:43 -070018import org.onlab.onos.cluster.ClusterStoreDelegate;
tom2d7c65f2014-09-23 01:09:35 -070019import org.onlab.onos.cluster.ControllerNode;
20import org.onlab.onos.cluster.DefaultControllerNode;
21import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070022import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
23import org.onlab.onos.store.common.AbstractHazelcastStore;
24import org.onlab.onos.store.common.OptionalCacheLoader;
tom2d7c65f2014-09-23 01:09:35 -070025import org.onlab.packet.IpPrefix;
tom2d7c65f2014-09-23 01:09:35 -070026
tomb41d1ac2014-09-24 01:51:24 -070027import java.util.Map;
tom2d7c65f2014-09-23 01:09:35 -070028import java.util.Set;
tomb41d1ac2014-09-24 01:51:24 -070029import java.util.concurrent.ConcurrentHashMap;
tom2d7c65f2014-09-23 01:09:35 -070030
tomb41d1ac2014-09-24 01:51:24 -070031import static com.google.common.cache.CacheBuilder.newBuilder;
tom0755a362014-09-24 11:54:43 -070032import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
33import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
tomb41d1ac2014-09-24 01:51:24 -070034import static org.onlab.onos.cluster.ControllerNode.State;
tom2d7c65f2014-09-23 01:09:35 -070035
36/**
37 * Distributed implementation of the cluster nodes store.
38 */
39@Component(immediate = true)
40@Service
tom0755a362014-09-24 11:54:43 -070041public class DistributedClusterStore
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070042 extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070043 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070044
tomb41d1ac2014-09-24 01:51:24 -070045 private IMap<byte[], byte[]> rawNodes;
46 private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
tom2d7c65f2014-09-23 01:09:35 -070047
tomb41d1ac2014-09-24 01:51:24 -070048 private String listenerId;
tom0755a362014-09-24 11:54:43 -070049 private final MembershipListener listener = new InternalMembershipListener();
tomb41d1ac2014-09-24 01:51:24 -070050 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
tom2d7c65f2014-09-23 01:09:35 -070051
52 @Activate
53 public void activate() {
tomb41d1ac2014-09-24 01:51:24 -070054 super.activate();
55 listenerId = theInstance.getCluster().addMembershipListener(listener);
tom2d7c65f2014-09-23 01:09:35 -070056
tomb41d1ac2014-09-24 01:51:24 -070057 rawNodes = theInstance.getMap("nodes");
58 OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
59 = new OptionalCacheLoader<>(storeService, rawNodes);
60 nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
61 rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
62
63 loadClusterNodes();
64
65 log.info("Started");
66 }
67
68 // Loads the initial set of cluster nodes
69 private void loadClusterNodes() {
70 for (Member member : theInstance.getCluster().getMembers()) {
71 addMember(member);
72 }
tom2d7c65f2014-09-23 01:09:35 -070073 }
74
75 @Deactivate
76 public void deactivate() {
tomb41d1ac2014-09-24 01:51:24 -070077 theInstance.getCluster().removeMembershipListener(listenerId);
tom2d7c65f2014-09-23 01:09:35 -070078 log.info("Stopped");
79 }
80
81 @Override
82 public ControllerNode getLocalNode() {
83 return node(theInstance.getCluster().getLocalMember());
84 }
85
86 @Override
87 public Set<ControllerNode> getNodes() {
88 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
tomb41d1ac2014-09-24 01:51:24 -070089 for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
90 builder.add(optional.get());
tom2d7c65f2014-09-23 01:09:35 -070091 }
92 return builder.build();
93 }
94
95 @Override
96 public ControllerNode getNode(NodeId nodeId) {
tomb41d1ac2014-09-24 01:51:24 -070097 return nodes.getUnchecked(nodeId).orNull();
tom2d7c65f2014-09-23 01:09:35 -070098 }
99
100 @Override
tomb41d1ac2014-09-24 01:51:24 -0700101 public State getState(NodeId nodeId) {
102 State state = states.get(nodeId);
103 return state == null ? State.INACTIVE : state;
104 }
105
106 @Override
107 public void removeNode(NodeId nodeId) {
108 synchronized (this) {
109 rawNodes.remove(serialize(nodeId));
110 nodes.invalidate(nodeId);
111 }
112 }
113
114 // Adds a new node based on the specified member
tom0755a362014-09-24 11:54:43 -0700115 private synchronized ControllerNode addMember(Member member) {
tomb41d1ac2014-09-24 01:51:24 -0700116 DefaultControllerNode node = node(member);
117 rawNodes.put(serialize(node.id()), serialize(node));
118 nodes.put(node.id(), Optional.of(node));
119 states.put(node.id(), State.ACTIVE);
tom0755a362014-09-24 11:54:43 -0700120 return node;
tom2d7c65f2014-09-23 01:09:35 -0700121 }
122
123 // Creates a controller node descriptor from the Hazelcast member.
tomb41d1ac2014-09-24 01:51:24 -0700124 private DefaultControllerNode node(Member member) {
125 IpPrefix ip = memberAddress(member);
126 return new DefaultControllerNode(new NodeId(ip.toString()), ip);
127 }
128
129 private IpPrefix memberAddress(Member member) {
130 byte[] address = member.getSocketAddress().getAddress().getAddress();
131 return IpPrefix.valueOf(address);
132 }
133
134 // Interceptor for membership events.
tom0755a362014-09-24 11:54:43 -0700135 private class InternalMembershipListener implements MembershipListener {
tomb41d1ac2014-09-24 01:51:24 -0700136 @Override
137 public void memberAdded(MembershipEvent membershipEvent) {
138 log.info("Member {} added", membershipEvent.getMember());
tom0755a362014-09-24 11:54:43 -0700139 ControllerNode node = addMember(membershipEvent.getMember());
140 notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
tomb41d1ac2014-09-24 01:51:24 -0700141 }
142
143 @Override
144 public void memberRemoved(MembershipEvent membershipEvent) {
145 log.info("Member {} removed", membershipEvent.getMember());
tom0755a362014-09-24 11:54:43 -0700146 NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
147 states.put(nodeId, State.INACTIVE);
148 notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
tomb41d1ac2014-09-24 01:51:24 -0700149 }
150
151 @Override
152 public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
153 log.info("Member {} attribute {} changed to {}",
154 memberAttributeEvent.getMember(),
155 memberAttributeEvent.getKey(),
156 memberAttributeEvent.getValue());
157 }
tom2d7c65f2014-09-23 01:09:35 -0700158 }
159}