blob: dec2c23c026a9a8193be40fdde7ca94b72cf59a8 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
tom2d7c65f2014-09-23 01:09:35 -070017
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
20import com.hazelcast.util.AddressUtil;
tom2d7c65f2014-09-23 01:09:35 -070021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070024import org.apache.felix.scr.annotations.Service;
Madan Jampani7d2fab22015-03-18 17:21:57 -070025import org.joda.time.DateTime;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080026import org.onlab.netty.Endpoint;
27import org.onlab.netty.Message;
28import org.onlab.netty.MessageHandler;
29import org.onlab.netty.NettyMessagingService;
30import org.onlab.packet.IpAddress;
31import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.cluster.ClusterEvent;
33import org.onosproject.cluster.ClusterStore;
34import org.onosproject.cluster.ClusterStoreDelegate;
35import org.onosproject.cluster.ControllerNode;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080036import org.onosproject.cluster.ControllerNode.State;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070037import org.onosproject.cluster.DefaultControllerNode;
Brian O'Connorabafb502014-12-02 22:26:20 -080038import org.onosproject.cluster.NodeId;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080039import org.onosproject.store.AbstractStore;
40import org.onosproject.store.serializers.KryoNamespaces;
41import org.onosproject.store.serializers.KryoSerializer;
42import org.slf4j.Logger;
tom2d7c65f2014-09-23 01:09:35 -070043
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070044import java.io.File;
45import java.io.IOException;
46import java.net.InetAddress;
47import java.net.NetworkInterface;
48import java.net.SocketException;
49import java.util.Enumeration;
50import java.util.Map;
51import java.util.Set;
52import java.util.concurrent.ExecutorService;
53import java.util.concurrent.Executors;
54import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.TimeUnit;
56import java.util.stream.Collectors;
57
58import static com.google.common.base.Preconditions.checkArgument;
59import static com.google.common.base.Preconditions.checkNotNull;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.slf4j.LoggerFactory.getLogger;
tom2d7c65f2014-09-23 01:09:35 -070062
tom2d7c65f2014-09-23 01:09:35 -070063@Component(immediate = true)
64@Service
Ayaka Koshibedd91b842015-03-02 14:48:47 -080065/**
66 * Distributed cluster nodes store that employs an accrual failure
67 * detector to identify cluster member up/down status.
68 */
tom0755a362014-09-24 11:54:43 -070069public class DistributedClusterStore
Ayaka Koshibedd91b842015-03-02 14:48:47 -080070 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070071 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070072
Ayaka Koshibedd91b842015-03-02 14:48:47 -080073 private final Logger log = getLogger(DistributedClusterStore.class);
tom2d7c65f2014-09-23 01:09:35 -070074
Ayaka Koshibedd91b842015-03-02 14:48:47 -080075 // TODO: make these configurable.
76 private static final int HEARTBEAT_FD_PORT = 2419;
77 private static final int HEARTBEAT_INTERVAL_MS = 100;
78 private static final int PHI_FAILURE_THRESHOLD = 10;
tom2d7c65f2014-09-23 01:09:35 -070079
Ayaka Koshibedd91b842015-03-02 14:48:47 -080080 private static final String CONFIG_DIR = "../config";
81 private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
82 private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -080083
Ayaka Koshibedd91b842015-03-02 14:48:47 -080084 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
85 @Override
86 protected void setupKryoPool() {
87 serializerPool = KryoNamespace.newBuilder()
88 .register(KryoNamespaces.API)
89 .register(HeartbeatMessage.class)
90 .build()
91 .populate(1);
92 }
93 };
94
95 private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
96
97 private ClusterDefinition clusterDefinition;
98
99 private Set<ControllerNode> seedNodes;
100 private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
101 private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
Madan Jampani7d2fab22015-03-18 17:21:57 -0700102 private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800103 private NettyMessagingService messagingService = new NettyMessagingService();
104 private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
105 groupedThreads("onos/cluster/membership", "heartbeat-sender"));
106 private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
107 groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
108
109 private PhiAccrualFailureDetector failureDetector;
110
111 private ControllerNode localNode;
112
tom2d7c65f2014-09-23 01:09:35 -0700113 @Activate
114 public void activate() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800115 File clusterDefinitionFile = new File(CONFIG_DIR,
116 CLUSTER_DEFINITION_FILE);
tom2d7c65f2014-09-23 01:09:35 -0700117
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800118 try {
119 clusterDefinition = new ClusterDefinitionStore(
120 clusterDefinitionFile.getPath()).read();
121 seedNodes = ImmutableSet
122 .copyOf(clusterDefinition.getNodes())
123 .stream()
124 .map(nodeInfo -> new DefaultControllerNode(new NodeId(
125 nodeInfo.getId()), IpAddress.valueOf(nodeInfo
126 .getIp()), nodeInfo.getTcpPort()))
127 .collect(Collectors.toSet());
128 } catch (IOException e) {
129 throw new IllegalStateException(
130 "Failed to read cluster definition.", e);
131 }
tomb41d1ac2014-09-24 01:51:24 -0700132
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800133 seedNodes.forEach(node -> {
134 allNodes.put(node.id(), node);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700135 updateState(node.id(), State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800136 });
137
138 establishSelfIdentity();
139
140 messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
141
142 try {
143 messagingService.activate();
144 } catch (InterruptedException e) {
145 Thread.currentThread().interrupt();
146 throw new IllegalStateException(
147 "Failed to cleanly initialize membership and"
148 + " failure detector communication channel.", e);
149 }
150 messagingService.registerHandler(HEARTBEAT_MESSAGE,
151 new HeartbeatMessageHandler(), heartBeatMessageHandler);
152
153 failureDetector = new PhiAccrualFailureDetector();
154
155 heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
156 HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
tomb41d1ac2014-09-24 01:51:24 -0700157
158 log.info("Started");
159 }
160
tom2d7c65f2014-09-23 01:09:35 -0700161 @Deactivate
162 public void deactivate() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800163 try {
164 messagingService.deactivate();
165 } catch (Exception e) {
166 log.trace("Failed to cleanly shutdown cluster membership messaging", e);
167 }
168
169 heartBeatSender.shutdownNow();
170 heartBeatMessageHandler.shutdownNow();
171
tom2d7c65f2014-09-23 01:09:35 -0700172 log.info("Stopped");
173 }
174
175 @Override
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800176 public void setDelegate(ClusterStoreDelegate delegate) {
177 checkNotNull(delegate, "Delegate cannot be null");
178 this.delegate = delegate;
179 }
180
181 @Override
182 public void unsetDelegate(ClusterStoreDelegate delegate) {
183 this.delegate = null;
184 }
185
186 @Override
187 public boolean hasDelegate() {
188 return this.delegate != null;
189 }
190
191 @Override
tom2d7c65f2014-09-23 01:09:35 -0700192 public ControllerNode getLocalNode() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800193 return localNode;
tom2d7c65f2014-09-23 01:09:35 -0700194 }
195
196 @Override
197 public Set<ControllerNode> getNodes() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800198 return ImmutableSet.copyOf(allNodes.values());
tom2d7c65f2014-09-23 01:09:35 -0700199 }
200
201 @Override
202 public ControllerNode getNode(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800203 checkNotNull(nodeId, INSTANCE_ID_NULL);
204 return allNodes.get(nodeId);
tom2d7c65f2014-09-23 01:09:35 -0700205 }
206
207 @Override
tomb41d1ac2014-09-24 01:51:24 -0700208 public State getState(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800209 checkNotNull(nodeId, INSTANCE_ID_NULL);
210 return nodeStates.get(nodeId);
tomb41d1ac2014-09-24 01:51:24 -0700211 }
212
213 @Override
Pavlin Radoslavov444b5192014-10-28 10:45:19 -0700214 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800215 checkNotNull(nodeId, INSTANCE_ID_NULL);
216 checkNotNull(ip, "IP address must not be null");
217 checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
218 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
219 allNodes.put(node.id(), node);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700220 updateState(nodeId, State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800221 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
222 return node;
tomee49c372014-09-26 15:14:50 -0700223 }
224
225 @Override
tomb41d1ac2014-09-24 01:51:24 -0700226 public void removeNode(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800227 checkNotNull(nodeId, INSTANCE_ID_NULL);
228 ControllerNode node = allNodes.remove(nodeId);
229 if (node != null) {
230 nodeStates.remove(nodeId);
231 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
tomb41d1ac2014-09-24 01:51:24 -0700232 }
233 }
234
Madan Jampani7d2fab22015-03-18 17:21:57 -0700235 private void updateState(NodeId nodeId, State newState) {
236 nodeStates.put(nodeId, newState);
237 nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
238 }
239
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800240 private void establishSelfIdentity() {
241 try {
242 IpAddress ip = findLocalIp();
243 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
244 allNodes.put(localNode.id(), localNode);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700245 updateState(localNode.id(), State.ACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800246 log.info("Local Node: {}", localNode);
247 } catch (SocketException e) {
248 throw new IllegalStateException("Cannot determine local IP", e);
249 }
tom2d7c65f2014-09-23 01:09:35 -0700250 }
251
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800252 private void heartbeat() {
253 try {
254 Set<ControllerNode> peers = allNodes.values()
255 .stream()
256 .filter(node -> !(node.id().equals(localNode.id())))
257 .collect(Collectors.toSet());
258 byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
259 peers.forEach((node) -> {
260 heartbeatToPeer(hbMessagePayload, node);
261 State currentState = nodeStates.get(node.id());
262 double phi = failureDetector.phi(node.id());
263 if (phi >= PHI_FAILURE_THRESHOLD) {
264 if (currentState == State.ACTIVE) {
Madan Jampani7d2fab22015-03-18 17:21:57 -0700265 updateState(node.id(), State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800266 notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
267 }
268 } else {
269 if (currentState == State.INACTIVE) {
Madan Jampani7d2fab22015-03-18 17:21:57 -0700270 updateState(node.id(), State.ACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800271 notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
272 }
273 }
274 });
275 } catch (Exception e) {
276 log.debug("Failed to send heartbeat", e);
277 }
tomb41d1ac2014-09-24 01:51:24 -0700278 }
279
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800280 private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
281 ControllerNode node = allNodes.get(nodeId);
282 if (newState == State.ACTIVE) {
283 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
284 } else {
285 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
286 }
tomb41d1ac2014-09-24 01:51:24 -0700287 }
288
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800289 private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
290 Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
291 try {
292 messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
293 } catch (IOException e) {
Jonathan Hart4a4d18f2015-03-26 12:16:16 -0700294 log.trace("Sending heartbeat to {} failed", remoteEp, e);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800295 }
296 }
297
298 private IpAddress findLocalIp() throws SocketException {
299 Enumeration<NetworkInterface> interfaces =
300 NetworkInterface.getNetworkInterfaces();
301 while (interfaces.hasMoreElements()) {
302 NetworkInterface iface = interfaces.nextElement();
303 Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
304 while (inetAddresses.hasMoreElements()) {
305 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
306 if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
307 return ip;
308 }
309 }
310 }
311 throw new IllegalStateException("Unable to determine local ip");
312 }
313
314 private class HeartbeatMessageHandler implements MessageHandler {
tomb41d1ac2014-09-24 01:51:24 -0700315 @Override
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800316 public void handle(Message message) throws IOException {
317 HeartbeatMessage hb = SERIALIZER.decode(message.payload());
318 failureDetector.report(hb.source().id());
319 hb.knownPeers().forEach(node -> {
320 allNodes.put(node.id(), node);
321 });
tomb41d1ac2014-09-24 01:51:24 -0700322 }
tom2d7c65f2014-09-23 01:09:35 -0700323 }
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800324
325 private static class HeartbeatMessage {
326 private ControllerNode source;
327 private Set<ControllerNode> knownPeers;
328
329 public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
330 this.source = source;
331 this.knownPeers = ImmutableSet.copyOf(members);
332 }
333
334 public ControllerNode source() {
335 return source;
336 }
337
338 public Set<ControllerNode> knownPeers() {
339 return knownPeers;
340 }
341 }
342
Madan Jampani7d2fab22015-03-18 17:21:57 -0700343 @Override
344 public DateTime getLastUpdated(NodeId nodeId) {
345 return nodeStateLastUpdatedTimes.get(nodeId);
346 }
Jonathan Hart4a4d18f2015-03-26 12:16:16 -0700347}