blob: 0afc462c69682608279c719fc55e49dce78d5c1e [file] [log] [blame]
Jordan Halterman00e92da2018-05-22 23:05:52 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.cluster.impl;
17
18import java.time.Instant;
19import java.util.Map;
20import java.util.Objects;
21import java.util.Set;
22import java.util.stream.Collectors;
23
24import com.google.common.collect.Maps;
25import io.atomix.cluster.ClusterMembershipEvent;
26import io.atomix.cluster.ClusterMembershipEventListener;
27import io.atomix.cluster.ClusterMembershipService;
28import io.atomix.cluster.Member;
29import org.apache.felix.scr.annotations.Activate;
30import org.apache.felix.scr.annotations.Component;
31import org.apache.felix.scr.annotations.Deactivate;
32import org.apache.felix.scr.annotations.Reference;
33import org.apache.felix.scr.annotations.ReferenceCardinality;
34import org.apache.felix.scr.annotations.Service;
35import org.onlab.packet.IpAddress;
36import org.onosproject.cluster.ClusterEvent;
37import org.onosproject.cluster.ClusterStore;
38import org.onosproject.cluster.ClusterStoreDelegate;
39import org.onosproject.cluster.ControllerNode;
40import org.onosproject.cluster.DefaultControllerNode;
41import org.onosproject.cluster.Node;
42import org.onosproject.cluster.NodeId;
43import org.onosproject.core.Version;
44import org.onosproject.core.VersionService;
45import org.onosproject.store.AbstractStore;
46import org.onosproject.store.impl.AtomixManager;
47import org.slf4j.Logger;
48import org.slf4j.LoggerFactory;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51
52/**
53 * Atomix cluster store.
54 */
55@Component(immediate = true)
56@Service
57public class AtomixClusterStore extends AbstractStore<ClusterEvent, ClusterStoreDelegate> implements ClusterStore {
58 private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
59
60 private static final String STATE_KEY = "state";
61 private static final String VERSION_KEY = "version";
62
63 private final Logger log = LoggerFactory.getLogger(getClass());
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected AtomixManager atomixManager;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected VersionService versionService;
70
71 private ClusterMembershipService membershipService;
72 private ControllerNode localNode;
73 private final Map<NodeId, ControllerNode> nodes = Maps.newConcurrentMap();
74 private final Map<NodeId, ControllerNode.State> states = Maps.newConcurrentMap();
75 private final Map<NodeId, Version> versions = Maps.newConcurrentMap();
76 private final Map<NodeId, Instant> updates = Maps.newConcurrentMap();
77 private final ClusterMembershipEventListener membershipEventListener = this::changeMembership;
78
79 @Activate
80 public void activate() {
81 membershipService = atomixManager.getAtomix().getMembershipService();
82 membershipService.addListener(membershipEventListener);
83 membershipService.getMembers().forEach(member -> {
84 ControllerNode node = toControllerNode(member);
85 nodes.put(node.id(), node);
86 updateState(node, member);
87 updateVersion(node, member);
88 });
89 membershipService.getLocalMember().properties().put(STATE_KEY, ControllerNode.State.ACTIVE.name());
90 membershipService.getLocalMember().properties().put(VERSION_KEY, versionService.version().toString());
91 localNode = toControllerNode(membershipService.getLocalMember());
92 states.put(localNode.id(), ControllerNode.State.ACTIVE);
93 versions.put(localNode.id(), versionService.version());
94 log.info("Started");
95 }
96
97 @Deactivate
98 public void deactivate() {
99 membershipService.removeListener(membershipEventListener);
100 log.info("Stopped");
101 }
102
103 private void changeMembership(ClusterMembershipEvent event) {
104 ControllerNode node = nodes.get(NodeId.nodeId(event.subject().id().id()));
105 switch (event.type()) {
106 case MEMBER_ADDED:
107 case METADATA_CHANGED:
108 if (node == null) {
109 node = toControllerNode(event.subject());
110 nodes.put(node.id(), node);
111 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
112 }
113 updateVersion(node, event.subject());
114 updateState(node, event.subject());
115 break;
116 case MEMBER_REMOVED:
117 if (node != null
118 && states.put(node.id(), ControllerNode.State.INACTIVE) != ControllerNode.State.INACTIVE) {
119 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
120 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
121 }
122 break;
123 default:
124 break;
125 }
126 }
127
128 private void updateState(ControllerNode node, Member member) {
129 String state = member.properties().getProperty(STATE_KEY);
130 if (state == null || !state.equals(ControllerNode.State.READY.name())) {
131 if (states.put(node.id(), ControllerNode.State.ACTIVE) != ControllerNode.State.ACTIVE) {
132 log.info("Updated node {} state to {}", node.id(), ControllerNode.State.ACTIVE);
133 markUpdated(node.id());
134 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
135 }
136 } else {
137 if (states.put(node.id(), ControllerNode.State.READY) != ControllerNode.State.READY) {
138 log.info("Updated node {} state to {}", node.id(), ControllerNode.State.READY);
139 markUpdated(node.id());
140 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_READY, node));
141 }
142 }
143 }
144
145 private void updateVersion(ControllerNode node, Member member) {
146 String versionString = member.properties().getProperty(VERSION_KEY);
147 if (versionString != null) {
148 Version version = Version.version(versionString);
149 if (!Objects.equals(versions.put(node.id(), version), version)) {
150 log.info("Updated node {} version to {}", node.id(), version);
151 }
152 }
153 }
154
155 private void markUpdated(NodeId nodeId) {
156 updates.put(nodeId, Instant.now());
157 }
158
159 private ControllerNode toControllerNode(Member member) {
160 return new DefaultControllerNode(
161 NodeId.nodeId(member.id().id()),
162 IpAddress.valueOf(member.address().address()),
163 member.address().port());
164 }
165
166 @Override
167 public ControllerNode getLocalNode() {
168 return toControllerNode(membershipService.getLocalMember());
169 }
170
171 @Override
172 public Set<Node> getStorageNodes() {
173 return membershipService.getMembers()
174 .stream()
175 .filter(member -> !Objects.equals(member.properties().getProperty("type"), "onos"))
176 .map(this::toControllerNode)
177 .collect(Collectors.toSet());
178 }
179
180 @Override
181 public Set<ControllerNode> getNodes() {
182 return membershipService.getMembers()
183 .stream()
184 .filter(member -> Objects.equals(member.properties().getProperty("type"), "onos"))
185 .map(this::toControllerNode)
186 .collect(Collectors.toSet());
187 }
188
189 @Override
190 public ControllerNode getNode(NodeId nodeId) {
191 Member member = membershipService.getMember(nodeId.id());
192 return member != null ? toControllerNode(member) : null;
193 }
194
195 @Override
196 public ControllerNode.State getState(NodeId nodeId) {
197 checkNotNull(nodeId, INSTANCE_ID_NULL);
198 return states.get(nodeId);
199 }
200
201 @Override
202 public Version getVersion(NodeId nodeId) {
203 checkNotNull(nodeId, INSTANCE_ID_NULL);
204 return versions.get(nodeId);
205 }
206
207 @Override
208 public Instant getLastUpdatedInstant(NodeId nodeId) {
209 checkNotNull(nodeId, INSTANCE_ID_NULL);
210 return updates.get(nodeId);
211 }
212
213 @Override
214 public void markFullyStarted(boolean started) {
215 ControllerNode.State state = started ? ControllerNode.State.READY : ControllerNode.State.ACTIVE;
216 states.put(localNode.id(), state);
217 membershipService.getLocalMember().properties().setProperty(STATE_KEY, state.name());
218 }
219
220 @Override
221 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
222 checkNotNull(nodeId, INSTANCE_ID_NULL);
223 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
224 nodes.put(node.id(), node);
225 ControllerNode.State state = node.equals(localNode)
226 ? ControllerNode.State.ACTIVE : ControllerNode.State.INACTIVE;
227 membershipService.getMember(node.id().id()).properties().setProperty(STATE_KEY, state.name());
228 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
229 return node;
230 }
231
232 @Override
233 public void removeNode(NodeId nodeId) {
234 checkNotNull(nodeId, INSTANCE_ID_NULL);
235 ControllerNode node = nodes.remove(nodeId);
236 if (node != null) {
237 states.remove(nodeId);
238 notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
239 }
240 }
241}