blob: 933c8ec30a726d2549928c9e323d29b773813666 [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Thomas Vachuskab6d31672018-07-27 17:03:46 -070016package org.onosproject.store.atomix.cluster.impl;
Jordan Halterman00e92da2018-05-22 23:05:52 -070017
Jordan Halterman00e92da2018-05-22 23:05:52 -070018import com.google.common.collect.Maps;
19import io.atomix.cluster.ClusterMembershipEvent;
20import io.atomix.cluster.ClusterMembershipEventListener;
21import io.atomix.cluster.ClusterMembershipService;
22import io.atomix.cluster.Member;
Jordan Halterman00e92da2018-05-22 23:05:52 -070023import org.onlab.packet.IpAddress;
24import org.onosproject.cluster.ClusterEvent;
25import org.onosproject.cluster.ClusterStore;
26import org.onosproject.cluster.ClusterStoreDelegate;
27import org.onosproject.cluster.ControllerNode;
28import org.onosproject.cluster.DefaultControllerNode;
29import org.onosproject.cluster.Node;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.core.Version;
32import org.onosproject.core.VersionService;
33import org.onosproject.store.AbstractStore;
Thomas Vachuskab6d31672018-07-27 17:03:46 -070034import org.onosproject.store.atomix.impl.AtomixManager;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070035import org.osgi.service.component.annotations.Activate;
36import org.osgi.service.component.annotations.Component;
37import org.osgi.service.component.annotations.Deactivate;
38import org.osgi.service.component.annotations.Reference;
39import org.osgi.service.component.annotations.ReferenceCardinality;
Jordan Halterman00e92da2018-05-22 23:05:52 -070040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Ray Milkeyd84f89b2018-08-17 14:54:17 -070043import java.time.Instant;
44import java.util.Map;
45import java.util.Objects;
46import java.util.Set;
47import java.util.stream.Collectors;
48
Jordan Halterman00e92da2018-05-22 23:05:52 -070049import static com.google.common.base.Preconditions.checkNotNull;
50
51/**
52 * Atomix cluster store.
53 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054@Component(immediate = true, service = ClusterStore.class)
Jordan Halterman00e92da2018-05-22 23:05:52 -070055public class AtomixClusterStore extends AbstractStore<ClusterEvent, ClusterStoreDelegate> implements ClusterStore {
56 private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
57
58 private static final String STATE_KEY = "state";
59 private static final String VERSION_KEY = "version";
60
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman00e92da2018-05-22 23:05:52 -070064 protected AtomixManager atomixManager;
65
Ray Milkeyd84f89b2018-08-17 14:54:17 -070066 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman00e92da2018-05-22 23:05:52 -070067 protected VersionService versionService;
68
69 private ClusterMembershipService membershipService;
70 private ControllerNode localNode;
71 private final Map<NodeId, ControllerNode> nodes = Maps.newConcurrentMap();
72 private final Map<NodeId, ControllerNode.State> states = Maps.newConcurrentMap();
73 private final Map<NodeId, Version> versions = Maps.newConcurrentMap();
74 private final Map<NodeId, Instant> updates = Maps.newConcurrentMap();
75 private final ClusterMembershipEventListener membershipEventListener = this::changeMembership;
76
77 @Activate
78 public void activate() {
79 membershipService = atomixManager.getAtomix().getMembershipService();
80 membershipService.addListener(membershipEventListener);
81 membershipService.getMembers().forEach(member -> {
82 ControllerNode node = toControllerNode(member);
83 nodes.put(node.id(), node);
84 updateState(node, member);
85 updateVersion(node, member);
86 });
87 membershipService.getLocalMember().properties().put(STATE_KEY, ControllerNode.State.ACTIVE.name());
88 membershipService.getLocalMember().properties().put(VERSION_KEY, versionService.version().toString());
89 localNode = toControllerNode(membershipService.getLocalMember());
90 states.put(localNode.id(), ControllerNode.State.ACTIVE);
91 versions.put(localNode.id(), versionService.version());
92 log.info("Started");
93 }
94
95 @Deactivate
96 public void deactivate() {
97 membershipService.removeListener(membershipEventListener);
98 log.info("Stopped");
99 }
100
101 private void changeMembership(ClusterMembershipEvent event) {
102 ControllerNode node = nodes.get(NodeId.nodeId(event.subject().id().id()));
103 switch (event.type()) {
104 case MEMBER_ADDED:
105 case METADATA_CHANGED:
106 if (node == null) {
107 node = toControllerNode(event.subject());
108 nodes.put(node.id(), node);
109 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
110 }
111 updateVersion(node, event.subject());
112 updateState(node, event.subject());
113 break;
114 case MEMBER_REMOVED:
115 if (node != null
116 && states.put(node.id(), ControllerNode.State.INACTIVE) != ControllerNode.State.INACTIVE) {
117 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
118 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
119 }
120 break;
121 default:
122 break;
123 }
124 }
125
126 private void updateState(ControllerNode node, Member member) {
127 String state = member.properties().getProperty(STATE_KEY);
128 if (state == null || !state.equals(ControllerNode.State.READY.name())) {
129 if (states.put(node.id(), ControllerNode.State.ACTIVE) != ControllerNode.State.ACTIVE) {
130 log.info("Updated node {} state to {}", node.id(), ControllerNode.State.ACTIVE);
131 markUpdated(node.id());
132 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
133 }
134 } else {
135 if (states.put(node.id(), ControllerNode.State.READY) != ControllerNode.State.READY) {
136 log.info("Updated node {} state to {}", node.id(), ControllerNode.State.READY);
137 markUpdated(node.id());
138 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_READY, node));
139 }
140 }
141 }
142
143 private void updateVersion(ControllerNode node, Member member) {
144 String versionString = member.properties().getProperty(VERSION_KEY);
145 if (versionString != null) {
146 Version version = Version.version(versionString);
147 if (!Objects.equals(versions.put(node.id(), version), version)) {
148 log.info("Updated node {} version to {}", node.id(), version);
149 }
150 }
151 }
152
153 private void markUpdated(NodeId nodeId) {
154 updates.put(nodeId, Instant.now());
155 }
156
157 private ControllerNode toControllerNode(Member member) {
158 return new DefaultControllerNode(
159 NodeId.nodeId(member.id().id()),
160 IpAddress.valueOf(member.address().address()),
161 member.address().port());
162 }
163
164 @Override
165 public ControllerNode getLocalNode() {
166 return toControllerNode(membershipService.getLocalMember());
167 }
168
169 @Override
170 public Set<Node> getStorageNodes() {
171 return membershipService.getMembers()
172 .stream()
173 .filter(member -> !Objects.equals(member.properties().getProperty("type"), "onos"))
174 .map(this::toControllerNode)
175 .collect(Collectors.toSet());
176 }
177
178 @Override
179 public Set<ControllerNode> getNodes() {
180 return membershipService.getMembers()
181 .stream()
182 .filter(member -> Objects.equals(member.properties().getProperty("type"), "onos"))
183 .map(this::toControllerNode)
184 .collect(Collectors.toSet());
185 }
186
187 @Override
188 public ControllerNode getNode(NodeId nodeId) {
189 Member member = membershipService.getMember(nodeId.id());
190 return member != null ? toControllerNode(member) : null;
191 }
192
193 @Override
194 public ControllerNode.State getState(NodeId nodeId) {
195 checkNotNull(nodeId, INSTANCE_ID_NULL);
196 return states.get(nodeId);
197 }
198
199 @Override
200 public Version getVersion(NodeId nodeId) {
201 checkNotNull(nodeId, INSTANCE_ID_NULL);
202 return versions.get(nodeId);
203 }
204
205 @Override
206 public Instant getLastUpdatedInstant(NodeId nodeId) {
207 checkNotNull(nodeId, INSTANCE_ID_NULL);
208 return updates.get(nodeId);
209 }
210
211 @Override
212 public void markFullyStarted(boolean started) {
213 ControllerNode.State state = started ? ControllerNode.State.READY : ControllerNode.State.ACTIVE;
214 states.put(localNode.id(), state);
215 membershipService.getLocalMember().properties().setProperty(STATE_KEY, state.name());
216 }
217
218 @Override
219 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
220 checkNotNull(nodeId, INSTANCE_ID_NULL);
221 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
222 nodes.put(node.id(), node);
223 ControllerNode.State state = node.equals(localNode)
224 ? ControllerNode.State.ACTIVE : ControllerNode.State.INACTIVE;
225 membershipService.getMember(node.id().id()).properties().setProperty(STATE_KEY, state.name());
226 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
227 return node;
228 }
229
230 @Override
231 public void removeNode(NodeId nodeId) {
232 checkNotNull(nodeId, INSTANCE_ID_NULL);
233 ControllerNode node = nodes.remove(nodeId);
234 if (node != null) {
235 states.remove(nodeId);
236 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
237 }
238 }
239}