blob: f8202fc8d76358789a1a4a879568b28cae2075c9 [file] [log] [blame]
Madan Jampania14047d2015-02-25 12:23:02 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanibd6845d2015-02-25 11:43:48 -080016package org.onosproject.store.cluster.impl;
17
18import static org.onlab.util.Tools.groupedThreads;
19import static org.slf4j.LoggerFactory.getLogger;
20
21import java.io.File;
22import java.io.IOException;
23import java.net.InetAddress;
24import java.net.NetworkInterface;
25import java.net.SocketException;
26import java.util.Enumeration;
27import java.util.Map;
28import java.util.Set;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.ScheduledExecutorService;
32import java.util.concurrent.TimeUnit;
33import java.util.stream.Collectors;
34
35import org.apache.felix.scr.annotations.Activate;
36import org.apache.felix.scr.annotations.Component;
37import org.apache.felix.scr.annotations.Deactivate;
38import org.apache.felix.scr.annotations.Reference;
39import org.apache.felix.scr.annotations.ReferenceCardinality;
40import org.apache.felix.scr.annotations.Service;
41import org.onlab.netty.Endpoint;
42import org.onlab.netty.Message;
43import org.onlab.netty.MessageHandler;
44import org.onlab.netty.NettyMessagingService;
45import org.onlab.packet.IpAddress;
46import org.onlab.util.KryoNamespace;
47import org.onosproject.cluster.ClusterAdminService;
48import org.onosproject.cluster.ClusterEvent;
49import org.onosproject.cluster.ClusterEventListener;
50import org.onosproject.cluster.ClusterService;
51import org.onosproject.cluster.ControllerNode;
52import org.onosproject.cluster.ControllerNode.State;
53import org.onosproject.cluster.DefaultControllerNode;
54import org.onosproject.cluster.NodeId;
55import org.onosproject.event.AbstractListenerRegistry;
56import org.onosproject.event.EventDeliveryService;
57import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.serializers.KryoSerializer;
59import org.slf4j.Logger;
60
61import com.google.common.collect.ImmutableSet;
62import com.google.common.collect.Maps;
Madan Jampania14047d2015-02-25 12:23:02 -080063import com.hazelcast.util.AddressUtil;
Madan Jampanibd6845d2015-02-25 11:43:48 -080064
65import static com.google.common.base.Preconditions.checkNotNull;
66import static com.google.common.base.Preconditions.checkArgument;
67
68/**
69 * ClusterService implementation that employs an accrual failure
70 * detector to identify cluster member up/down status.
71 */
72@Component(immediate = true, enabled = false)
73@Service
74public class ClusterManager implements ClusterService, ClusterAdminService {
75
76 private final Logger log = getLogger(getClass());
77
78 protected final AbstractListenerRegistry<ClusterEvent, ClusterEventListener>
Madan Jampania14047d2015-02-25 12:23:02 -080079 listenerRegistry = new AbstractListenerRegistry<>();
Madan Jampanibd6845d2015-02-25 11:43:48 -080080
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected EventDeliveryService eventDispatcher;
83
84 // TODO: make these configurable.
85 private static final int HEARTBEAT_FD_PORT = 2419;
86 private static final int HEARTBEAT_INTERVAL_MS = 100;
87 private static final int PHI_FAILURE_THRESHOLD = 10;
88
89 private static final String CONFIG_DIR = "../config";
90 private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
91
Madan Jampania14047d2015-02-25 12:23:02 -080092 private ClusterDefinition clusterDefinition;
Madan Jampanibd6845d2015-02-25 11:43:48 -080093
94 private Set<ControllerNode> seedNodes;
95 private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
96 private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
97 private NettyMessagingService messagingService = new NettyMessagingService();
98 private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
99 groupedThreads("onos/cluster/membership", "heartbeat-sender"));
100 private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
101 groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
102
103 private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
104
105
106 private PhiAccrualFailureDetector failureDetector;
107
108 private ControllerNode localNode;
109
110 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
111 @Override
112 protected void setupKryoPool() {
113 serializerPool = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .register(HeartbeatMessage.class)
116 .build()
117 .populate(1);
118 }
119 };
120
121 private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
122
123 @Activate
124 public void activate() {
125
126 File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
Madan Jampania14047d2015-02-25 12:23:02 -0800127
Madan Jampanibd6845d2015-02-25 11:43:48 -0800128 try {
Madan Jampania14047d2015-02-25 12:23:02 -0800129 clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath()).read();
Madan Jampani0cb00672015-02-27 00:27:22 -0800130 seedNodes = ImmutableSet.copyOf(clusterDefinition.getNodes())
131 .stream()
132 .map(nodeInfo -> new DefaultControllerNode(
133 new NodeId(nodeInfo.getId()),
134 IpAddress.valueOf(nodeInfo.getIp()),
135 nodeInfo.getTcpPort()))
136 .collect(Collectors.toSet());
Madan Jampanibd6845d2015-02-25 11:43:48 -0800137 } catch (IOException e) {
Madan Jampani52860be2015-02-27 12:52:37 -0800138 throw new IllegalStateException("Failed to read cluster definition.", e);
Madan Jampanibd6845d2015-02-25 11:43:48 -0800139 }
140
141 seedNodes.forEach(node -> {
142 allNodes.put(node.id(), node);
143 nodeStates.put(node.id(), State.INACTIVE);
144 });
145
146 establishSelfIdentity();
147
148 messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
149
150 try {
151 messagingService.activate();
152 } catch (InterruptedException e) {
153 Thread.currentThread().interrupt();
Madan Jampania14047d2015-02-25 12:23:02 -0800154 throw new IllegalStateException("Failed to cleanly initialize membership and"
Madan Jampanibd6845d2015-02-25 11:43:48 -0800155 + " failure detector communication channel.", e);
156 }
157 messagingService.registerHandler(
158 HEARTBEAT_MESSAGE,
159 new HeartbeatMessageHandler(),
160 heartBeatMessageHandler);
161
162 eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
163 failureDetector = new PhiAccrualFailureDetector();
164
165 heartBeatSender.scheduleWithFixedDelay(
166 this::heartbeat,
167 0,
168 HEARTBEAT_INTERVAL_MS,
169 TimeUnit.MILLISECONDS);
170
171 log.info("Started");
172 }
173
174 @Deactivate
175 public void deactivate() {
176 try {
177 messagingService.deactivate();
178 } catch (Exception e) {
179 log.trace("Failed to cleanly shutdown cluster membership messaging", e);
180 }
181
Madan Jampania14047d2015-02-25 12:23:02 -0800182 heartBeatSender.shutdownNow();
183 heartBeatMessageHandler.shutdownNow();
Madan Jampanibd6845d2015-02-25 11:43:48 -0800184 eventDispatcher.removeSink(ClusterEvent.class);
185
186 log.info("Stopped");
187 }
188
189 @Override
190 public ControllerNode getLocalNode() {
191 return localNode;
192 }
193
194 @Override
195 public Set<ControllerNode> getNodes() {
196 return ImmutableSet.copyOf(allNodes.values());
197 }
198
199 @Override
200 public ControllerNode getNode(NodeId nodeId) {
201 checkNotNull(nodeId, INSTANCE_ID_NULL);
202 return allNodes.get(nodeId);
203 }
204
205 @Override
206 public State getState(NodeId nodeId) {
207 checkNotNull(nodeId, INSTANCE_ID_NULL);
208 return nodeStates.get(nodeId);
209 }
210
211 @Override
212 public void addListener(ClusterEventListener listener) {
213 checkNotNull(listener, "Listener must not be null");
214 listenerRegistry.addListener(listener);
215 }
216
217 @Override
218 public void removeListener(ClusterEventListener listener) {
219 checkNotNull(listener, "Listener must not be null");
220 listenerRegistry.removeListener(listener);
221 }
222
223 @Override
224 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
225 checkNotNull(nodeId, INSTANCE_ID_NULL);
226 checkNotNull(ip, "IP address must not be null");
227 checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
228 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
229 allNodes.put(node.id(), node);
230 nodeStates.put(nodeId, State.INACTIVE);
231 eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
232 return node;
233 }
234
235 @Override
236 public void removeNode(NodeId nodeId) {
237 checkNotNull(nodeId, INSTANCE_ID_NULL);
238 ControllerNode node = allNodes.remove(nodeId);
239 if (node != null) {
240 nodeStates.remove(nodeId);
241 eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
242 }
243 }
244
245 private void establishSelfIdentity() {
246 try {
247 IpAddress ip = findLocalIp();
248 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
249 allNodes.put(localNode.id(), localNode);
250 nodeStates.put(localNode.id(), State.ACTIVE);
251 log.info("Local Node: {}", localNode);
252 } catch (SocketException e) {
253 throw new IllegalStateException("Cannot determine local IP", e);
254 }
255 }
256
257 private void heartbeat() {
258 try {
259 Set<ControllerNode> peers = allNodes.values()
260 .stream()
261 .filter(node -> !(node.id().equals(localNode.id())))
262 .collect(Collectors.toSet());
263 byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
264 peers.forEach((node) -> {
265 heartbeatToPeer(hbMessagePayload, node);
266 State currentState = nodeStates.get(node.id());
267 double phi = failureDetector.phi(node.id());
268 if (phi >= PHI_FAILURE_THRESHOLD) {
269 if (currentState == State.ACTIVE) {
270 nodeStates.put(node.id(), State.INACTIVE);
271 notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
272 }
273 } else {
274 if (currentState == State.INACTIVE) {
275 nodeStates.put(node.id(), State.ACTIVE);
276 notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
277 }
278 }
279 });
280 } catch (Exception e) {
Madan Jampani52860be2015-02-27 12:52:37 -0800281 log.debug("Failed to send heartbeat", e);
Madan Jampanibd6845d2015-02-25 11:43:48 -0800282 }
283 }
284
285 private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
286 if (newState == State.ACTIVE) {
287 eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, allNodes.get(nodeId)));
288 } else {
289 eventDispatcher.post(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, allNodes.get(nodeId)));
290 }
291 }
292
293 private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
294 Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
295 try {
296 messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
297 } catch (IOException e) {
Madan Jampani52860be2015-02-27 12:52:37 -0800298 log.debug("Sending heartbeat to {} failed", remoteEp, e);
Madan Jampanibd6845d2015-02-25 11:43:48 -0800299 }
300 }
301
302 private class HeartbeatMessageHandler implements MessageHandler {
303 @Override
304 public void handle(Message message) throws IOException {
305 HeartbeatMessage hb = SERIALIZER.decode(message.payload());
306 failureDetector.report(hb.source().id());
307 hb.knownPeers().forEach(node -> {
308 allNodes.put(node.id(), node);
309 });
310 }
311 }
312
Madan Jampania14047d2015-02-25 12:23:02 -0800313 private static class HeartbeatMessage {
Madan Jampanibd6845d2015-02-25 11:43:48 -0800314 private ControllerNode source;
315 private Set<ControllerNode> knownPeers;
316
317 public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
318 this.source = source;
319 this.knownPeers = ImmutableSet.copyOf(members);
320 }
321
322 public ControllerNode source() {
323 return source;
324 }
325
326 public Set<ControllerNode> knownPeers() {
327 return knownPeers;
328 }
329 }
330
331 private IpAddress findLocalIp() throws SocketException {
Madan Jampania14047d2015-02-25 12:23:02 -0800332 Enumeration<NetworkInterface> interfaces =
333 NetworkInterface.getNetworkInterfaces();
334 while (interfaces.hasMoreElements()) {
335 NetworkInterface iface = interfaces.nextElement();
336 Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
337 while (inetAddresses.hasMoreElements()) {
338 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
Madan Jampani0cb00672015-02-27 00:27:22 -0800339 if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
Madan Jampania14047d2015-02-25 12:23:02 -0800340 return ip;
341 }
Madan Jampanibd6845d2015-02-25 11:43:48 -0800342 }
343 }
344 throw new IllegalStateException("Unable to determine local ip");
345 }
Madan Jampani0cb00672015-02-27 00:27:22 -0800346}