blob: f83ac59e2a889c7b0f94f9baeeab37e389d0f0a1 [file] [log] [blame]
tom2d7c65f2014-09-23 01:09:35 -07001package org.onlab.onos.store.cluster.impl;
2
tomb41d1ac2014-09-24 01:51:24 -07003import com.google.common.base.Optional;
4import com.google.common.cache.LoadingCache;
tom2d7c65f2014-09-23 01:09:35 -07005import com.google.common.collect.ImmutableSet;
tomb41d1ac2014-09-24 01:51:24 -07006import com.hazelcast.core.IMap;
tom2d7c65f2014-09-23 01:09:35 -07007import com.hazelcast.core.Member;
tomb41d1ac2014-09-24 01:51:24 -07008import com.hazelcast.core.MemberAttributeEvent;
9import com.hazelcast.core.MembershipEvent;
10import com.hazelcast.core.MembershipListener;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070011
tom2d7c65f2014-09-23 01:09:35 -070012import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070015import org.apache.felix.scr.annotations.Service;
tom0755a362014-09-24 11:54:43 -070016import org.onlab.onos.cluster.ClusterEvent;
tom2d7c65f2014-09-23 01:09:35 -070017import org.onlab.onos.cluster.ClusterStore;
tom0755a362014-09-24 11:54:43 -070018import org.onlab.onos.cluster.ClusterStoreDelegate;
tom2d7c65f2014-09-23 01:09:35 -070019import org.onlab.onos.cluster.ControllerNode;
20import org.onlab.onos.cluster.DefaultControllerNode;
21import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIb5df76d2014-09-27 20:54:00 -070022import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
23import org.onlab.onos.store.common.AbstractHazelcastStore;
24import org.onlab.onos.store.common.OptionalCacheLoader;
tom2d7c65f2014-09-23 01:09:35 -070025import org.onlab.packet.IpPrefix;
tom2d7c65f2014-09-23 01:09:35 -070026
tomb41d1ac2014-09-24 01:51:24 -070027import java.util.Map;
tom2d7c65f2014-09-23 01:09:35 -070028import java.util.Set;
tomb41d1ac2014-09-24 01:51:24 -070029import java.util.concurrent.ConcurrentHashMap;
tom2d7c65f2014-09-23 01:09:35 -070030
tomb41d1ac2014-09-24 01:51:24 -070031import static com.google.common.cache.CacheBuilder.newBuilder;
tom0755a362014-09-24 11:54:43 -070032import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
33import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
tomb41d1ac2014-09-24 01:51:24 -070034import static org.onlab.onos.cluster.ControllerNode.State;
tom2d7c65f2014-09-23 01:09:35 -070035
36/**
37 * Distributed implementation of the cluster nodes store.
38 */
39@Component(immediate = true)
40@Service
tom0755a362014-09-24 11:54:43 -070041public class DistributedClusterStore
Yuta HIGUCHI2e963892014-09-27 13:00:39 -070042 extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070043 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070044
tomb41d1ac2014-09-24 01:51:24 -070045 private IMap<byte[], byte[]> rawNodes;
46 private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
tom2d7c65f2014-09-23 01:09:35 -070047
tomb41d1ac2014-09-24 01:51:24 -070048 private String listenerId;
tom0755a362014-09-24 11:54:43 -070049 private final MembershipListener listener = new InternalMembershipListener();
tomb41d1ac2014-09-24 01:51:24 -070050 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
tom2d7c65f2014-09-23 01:09:35 -070051
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070052 @Override
tom2d7c65f2014-09-23 01:09:35 -070053 @Activate
54 public void activate() {
tomb41d1ac2014-09-24 01:51:24 -070055 super.activate();
56 listenerId = theInstance.getCluster().addMembershipListener(listener);
tom2d7c65f2014-09-23 01:09:35 -070057
tomb41d1ac2014-09-24 01:51:24 -070058 rawNodes = theInstance.getMap("nodes");
59 OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
Yuta HIGUCHIad4c2182014-09-29 11:16:23 -070060 = new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
tomb41d1ac2014-09-24 01:51:24 -070061 nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
Yuta HIGUCHIfec9e192014-09-28 14:58:02 -070062 rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
tomb41d1ac2014-09-24 01:51:24 -070063
64 loadClusterNodes();
65
66 log.info("Started");
67 }
68
69 // Loads the initial set of cluster nodes
70 private void loadClusterNodes() {
71 for (Member member : theInstance.getCluster().getMembers()) {
tomee49c372014-09-26 15:14:50 -070072 addNode(node(member));
tomb41d1ac2014-09-24 01:51:24 -070073 }
tom2d7c65f2014-09-23 01:09:35 -070074 }
75
76 @Deactivate
77 public void deactivate() {
tomb41d1ac2014-09-24 01:51:24 -070078 theInstance.getCluster().removeMembershipListener(listenerId);
tom2d7c65f2014-09-23 01:09:35 -070079 log.info("Stopped");
80 }
81
82 @Override
83 public ControllerNode getLocalNode() {
84 return node(theInstance.getCluster().getLocalMember());
85 }
86
87 @Override
88 public Set<ControllerNode> getNodes() {
89 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
tomb41d1ac2014-09-24 01:51:24 -070090 for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
91 builder.add(optional.get());
tom2d7c65f2014-09-23 01:09:35 -070092 }
93 return builder.build();
94 }
95
96 @Override
97 public ControllerNode getNode(NodeId nodeId) {
tomb41d1ac2014-09-24 01:51:24 -070098 return nodes.getUnchecked(nodeId).orNull();
tom2d7c65f2014-09-23 01:09:35 -070099 }
100
101 @Override
tomb41d1ac2014-09-24 01:51:24 -0700102 public State getState(NodeId nodeId) {
103 State state = states.get(nodeId);
104 return state == null ? State.INACTIVE : state;
105 }
106
107 @Override
tomee49c372014-09-26 15:14:50 -0700108 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
109 return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
110 }
111
112 @Override
tomb41d1ac2014-09-24 01:51:24 -0700113 public void removeNode(NodeId nodeId) {
114 synchronized (this) {
115 rawNodes.remove(serialize(nodeId));
116 nodes.invalidate(nodeId);
117 }
118 }
119
120 // Adds a new node based on the specified member
tomee49c372014-09-26 15:14:50 -0700121 private synchronized ControllerNode addNode(DefaultControllerNode node) {
tomb41d1ac2014-09-24 01:51:24 -0700122 rawNodes.put(serialize(node.id()), serialize(node));
123 nodes.put(node.id(), Optional.of(node));
124 states.put(node.id(), State.ACTIVE);
tom0755a362014-09-24 11:54:43 -0700125 return node;
tom2d7c65f2014-09-23 01:09:35 -0700126 }
127
128 // Creates a controller node descriptor from the Hazelcast member.
tomb41d1ac2014-09-24 01:51:24 -0700129 private DefaultControllerNode node(Member member) {
130 IpPrefix ip = memberAddress(member);
131 return new DefaultControllerNode(new NodeId(ip.toString()), ip);
132 }
133
134 private IpPrefix memberAddress(Member member) {
135 byte[] address = member.getSocketAddress().getAddress().getAddress();
136 return IpPrefix.valueOf(address);
137 }
138
139 // Interceptor for membership events.
tom0755a362014-09-24 11:54:43 -0700140 private class InternalMembershipListener implements MembershipListener {
tomb41d1ac2014-09-24 01:51:24 -0700141 @Override
142 public void memberAdded(MembershipEvent membershipEvent) {
143 log.info("Member {} added", membershipEvent.getMember());
tomee49c372014-09-26 15:14:50 -0700144 ControllerNode node = addNode(node(membershipEvent.getMember()));
tom0755a362014-09-24 11:54:43 -0700145 notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
tomb41d1ac2014-09-24 01:51:24 -0700146 }
147
148 @Override
149 public void memberRemoved(MembershipEvent membershipEvent) {
150 log.info("Member {} removed", membershipEvent.getMember());
tom0755a362014-09-24 11:54:43 -0700151 NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
152 states.put(nodeId, State.INACTIVE);
153 notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
tomb41d1ac2014-09-24 01:51:24 -0700154 }
155
156 @Override
157 public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
158 log.info("Member {} attribute {} changed to {}",
159 memberAttributeEvent.getMember(),
160 memberAttributeEvent.getKey(),
161 memberAttributeEvent.getValue());
162 }
tom2d7c65f2014-09-23 01:09:35 -0700163 }
164}