blob: f5bd7739e3a0d1d3256049ac99f879d51517124e [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
tom2d7c65f2014-09-23 01:09:35 -070017
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070020import com.google.common.collect.Sets;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070021import com.hazelcast.util.AddressUtil;
tom2d7c65f2014-09-23 01:09:35 -070022import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070025import org.apache.felix.scr.annotations.Service;
Madan Jampani7d2fab22015-03-18 17:21:57 -070026import org.joda.time.DateTime;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080027import org.onlab.netty.Endpoint;
28import org.onlab.netty.Message;
29import org.onlab.netty.MessageHandler;
30import org.onlab.netty.NettyMessagingService;
31import org.onlab.packet.IpAddress;
32import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.cluster.ClusterEvent;
34import org.onosproject.cluster.ClusterStore;
35import org.onosproject.cluster.ClusterStoreDelegate;
36import org.onosproject.cluster.ControllerNode;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080037import org.onosproject.cluster.ControllerNode.State;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070038import org.onosproject.cluster.DefaultControllerNode;
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.cluster.NodeId;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080040import org.onosproject.store.AbstractStore;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070041import org.onosproject.store.consistent.impl.DatabaseDefinition;
42import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080043import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.serializers.KryoSerializer;
45import org.slf4j.Logger;
tom2d7c65f2014-09-23 01:09:35 -070046
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070047import java.io.File;
48import java.io.IOException;
49import java.net.InetAddress;
50import java.net.NetworkInterface;
51import java.net.SocketException;
52import java.util.Enumeration;
53import java.util.Map;
54import java.util.Set;
55import java.util.concurrent.ExecutorService;
56import java.util.concurrent.Executors;
57import java.util.concurrent.ScheduledExecutorService;
58import java.util.concurrent.TimeUnit;
59import java.util.stream.Collectors;
60
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070061import static com.google.common.base.Preconditions.checkNotNull;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070062import static com.hazelcast.util.AddressUtil.matchInterface;
Thomas Vachuska8dc1a692015-03-31 01:01:37 -070063import static java.net.NetworkInterface.getNetworkInterfaces;
64import static java.util.Collections.list;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070065import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070066import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
67import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070068import static org.slf4j.LoggerFactory.getLogger;
tom2d7c65f2014-09-23 01:09:35 -070069
tom2d7c65f2014-09-23 01:09:35 -070070@Component(immediate = true)
71@Service
Ayaka Koshibedd91b842015-03-02 14:48:47 -080072/**
73 * Distributed cluster nodes store that employs an accrual failure
74 * detector to identify cluster member up/down status.
75 */
tom0755a362014-09-24 11:54:43 -070076public class DistributedClusterStore
Ayaka Koshibedd91b842015-03-02 14:48:47 -080077 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070078 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070079
Thomas Vachuska8dc1a692015-03-31 01:01:37 -070080 private static final Logger log = getLogger(DistributedClusterStore.class);
tom2d7c65f2014-09-23 01:09:35 -070081
Thomas Vachuskade563cf2015-04-01 00:28:50 -070082 public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
83 public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
84
Ayaka Koshibedd91b842015-03-02 14:48:47 -080085 // TODO: make these configurable.
86 private static final int HEARTBEAT_FD_PORT = 2419;
87 private static final int HEARTBEAT_INTERVAL_MS = 100;
88 private static final int PHI_FAILURE_THRESHOLD = 10;
tom2d7c65f2014-09-23 01:09:35 -070089
Ayaka Koshibedd91b842015-03-02 14:48:47 -080090 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
91 @Override
92 protected void setupKryoPool() {
93 serializerPool = KryoNamespace.newBuilder()
Thomas Vachuska8dc1a692015-03-31 01:01:37 -070094 .register(KryoNamespaces.API)
95 .register(HeartbeatMessage.class)
96 .build()
97 .populate(1);
Ayaka Koshibedd91b842015-03-02 14:48:47 -080098 }
99 };
100
101 private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700102 private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
103 private static final String ONOS_NIC = "ONOS_NIC";
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800104
105 private ClusterDefinition clusterDefinition;
106
107 private Set<ControllerNode> seedNodes;
108 private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
109 private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
Madan Jampani7d2fab22015-03-18 17:21:57 -0700110 private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800111 private NettyMessagingService messagingService = new NettyMessagingService();
112 private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
113 groupedThreads("onos/cluster/membership", "heartbeat-sender"));
114 private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
115 groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
116
117 private PhiAccrualFailureDetector failureDetector;
118
119 private ControllerNode localNode;
120
tom2d7c65f2014-09-23 01:09:35 -0700121 @Activate
122 public void activate() {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700123 File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700124 ClusterDefinitionStore clusterDefinitionStore =
125 new ClusterDefinitionStore(clusterDefinitionFile.getPath());
126
127 if (!clusterDefinitionFile.exists()) {
128 createDefaultClusterDefinition(clusterDefinitionStore);
129 }
tom2d7c65f2014-09-23 01:09:35 -0700130
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800131 try {
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700132 clusterDefinition = clusterDefinitionStore.read();
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800133 seedNodes = ImmutableSet
134 .copyOf(clusterDefinition.getNodes())
135 .stream()
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700136 .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
137 IpAddress.valueOf(n.getIp()),
138 n.getTcpPort()))
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800139 .collect(Collectors.toSet());
140 } catch (IOException e) {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700141 throw new IllegalStateException("Failed to read cluster definition.", e);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800142 }
tomb41d1ac2014-09-24 01:51:24 -0700143
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800144 seedNodes.forEach(node -> {
145 allNodes.put(node.id(), node);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700146 updateState(node.id(), State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800147 });
148
149 establishSelfIdentity();
150
151 messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
152
153 try {
154 messagingService.activate();
155 } catch (InterruptedException e) {
156 Thread.currentThread().interrupt();
157 throw new IllegalStateException(
158 "Failed to cleanly initialize membership and"
159 + " failure detector communication channel.", e);
160 }
161 messagingService.registerHandler(HEARTBEAT_MESSAGE,
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700162 new HeartbeatMessageHandler(), heartBeatMessageHandler);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800163
164 failureDetector = new PhiAccrualFailureDetector();
165
166 heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700167 HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
tomb41d1ac2014-09-24 01:51:24 -0700168
169 log.info("Started");
170 }
171
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700172 private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
173 // Assumes IPv4 is returned.
174 String ip = DistributedClusterStore.getSiteLocalAddress();
175 String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
176 NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
177 try {
178 store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
179 } catch (IOException e) {
180 log.warn("Unable to write default cluster definition", e);
181 }
182 }
183
184 /**
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700185 * Returns the address that matches the IP prefix given in ONOS_NIC
186 * environment variable if one was specified, or the first site local
187 * address if one can be found or the loopback address otherwise.
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700188 *
189 * @return site-local address in string form
190 */
191 public static String getSiteLocalAddress() {
192 try {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700193 String ipPrefix = System.getenv(ONOS_NIC);
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700194 for (NetworkInterface nif : list(getNetworkInterfaces())) {
195 for (InetAddress address : list(nif.getInetAddresses())) {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700196 IpAddress ip = IpAddress.valueOf(address);
197 if (ipPrefix == null && address.isSiteLocalAddress() ||
198 ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
199 return ip.toString();
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700200 }
201 }
202 }
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700203
204 } catch (SocketException e) {
205 log.error("Unable to get network interfaces", e);
206 }
207
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700208 return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700209 }
210
tom2d7c65f2014-09-23 01:09:35 -0700211 @Deactivate
212 public void deactivate() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800213 try {
214 messagingService.deactivate();
215 } catch (Exception e) {
216 log.trace("Failed to cleanly shutdown cluster membership messaging", e);
217 }
218
219 heartBeatSender.shutdownNow();
220 heartBeatMessageHandler.shutdownNow();
221
tom2d7c65f2014-09-23 01:09:35 -0700222 log.info("Stopped");
223 }
224
225 @Override
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800226 public void setDelegate(ClusterStoreDelegate delegate) {
227 checkNotNull(delegate, "Delegate cannot be null");
228 this.delegate = delegate;
229 }
230
231 @Override
232 public void unsetDelegate(ClusterStoreDelegate delegate) {
233 this.delegate = null;
234 }
235
236 @Override
237 public boolean hasDelegate() {
238 return this.delegate != null;
239 }
240
241 @Override
tom2d7c65f2014-09-23 01:09:35 -0700242 public ControllerNode getLocalNode() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800243 return localNode;
tom2d7c65f2014-09-23 01:09:35 -0700244 }
245
246 @Override
247 public Set<ControllerNode> getNodes() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800248 return ImmutableSet.copyOf(allNodes.values());
tom2d7c65f2014-09-23 01:09:35 -0700249 }
250
251 @Override
252 public ControllerNode getNode(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800253 checkNotNull(nodeId, INSTANCE_ID_NULL);
254 return allNodes.get(nodeId);
tom2d7c65f2014-09-23 01:09:35 -0700255 }
256
257 @Override
tomb41d1ac2014-09-24 01:51:24 -0700258 public State getState(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800259 checkNotNull(nodeId, INSTANCE_ID_NULL);
260 return nodeStates.get(nodeId);
tomb41d1ac2014-09-24 01:51:24 -0700261 }
262
263 @Override
Pavlin Radoslavov444b5192014-10-28 10:45:19 -0700264 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800265 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
266 allNodes.put(node.id(), node);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700267 updateState(nodeId, State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800268 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
269 return node;
tomee49c372014-09-26 15:14:50 -0700270 }
271
272 @Override
tomb41d1ac2014-09-24 01:51:24 -0700273 public void removeNode(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800274 checkNotNull(nodeId, INSTANCE_ID_NULL);
275 ControllerNode node = allNodes.remove(nodeId);
276 if (node != null) {
277 nodeStates.remove(nodeId);
278 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
tomb41d1ac2014-09-24 01:51:24 -0700279 }
280 }
281
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700282 @Override
283 public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
284 try {
285 Set<NodeInfo> infos = Sets.newHashSet();
286 nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
287 n.ip().toString(),
288 n.tcpPort())));
289
290 ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
291 new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
292
293 DatabaseDefinition ddef = DatabaseDefinition.from(infos);
294 new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
295 } catch (IOException e) {
296 log.error("Unable to form cluster", e);
297 }
298 }
299
Madan Jampani7d2fab22015-03-18 17:21:57 -0700300 private void updateState(NodeId nodeId, State newState) {
301 nodeStates.put(nodeId, newState);
302 nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
303 }
304
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800305 private void establishSelfIdentity() {
306 try {
307 IpAddress ip = findLocalIp();
308 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
309 allNodes.put(localNode.id(), localNode);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700310 updateState(localNode.id(), State.ACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800311 log.info("Local Node: {}", localNode);
312 } catch (SocketException e) {
313 throw new IllegalStateException("Cannot determine local IP", e);
314 }
tom2d7c65f2014-09-23 01:09:35 -0700315 }
316
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800317 private void heartbeat() {
318 try {
319 Set<ControllerNode> peers = allNodes.values()
320 .stream()
321 .filter(node -> !(node.id().equals(localNode.id())))
322 .collect(Collectors.toSet());
323 byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
324 peers.forEach((node) -> {
325 heartbeatToPeer(hbMessagePayload, node);
326 State currentState = nodeStates.get(node.id());
327 double phi = failureDetector.phi(node.id());
328 if (phi >= PHI_FAILURE_THRESHOLD) {
329 if (currentState == State.ACTIVE) {
Madan Jampani7d2fab22015-03-18 17:21:57 -0700330 updateState(node.id(), State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800331 notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
332 }
333 } else {
334 if (currentState == State.INACTIVE) {
Madan Jampani7d2fab22015-03-18 17:21:57 -0700335 updateState(node.id(), State.ACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800336 notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
337 }
338 }
339 });
340 } catch (Exception e) {
341 log.debug("Failed to send heartbeat", e);
342 }
tomb41d1ac2014-09-24 01:51:24 -0700343 }
344
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800345 private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
346 ControllerNode node = allNodes.get(nodeId);
347 if (newState == State.ACTIVE) {
348 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
349 } else {
350 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
351 }
tomb41d1ac2014-09-24 01:51:24 -0700352 }
353
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800354 private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
355 Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
356 try {
357 messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
358 } catch (IOException e) {
Jonathan Hart4a4d18f2015-03-26 12:16:16 -0700359 log.trace("Sending heartbeat to {} failed", remoteEp, e);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800360 }
361 }
362
363 private IpAddress findLocalIp() throws SocketException {
364 Enumeration<NetworkInterface> interfaces =
365 NetworkInterface.getNetworkInterfaces();
366 while (interfaces.hasMoreElements()) {
367 NetworkInterface iface = interfaces.nextElement();
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700368 Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800369 while (inetAddresses.hasMoreElements()) {
370 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
371 if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
372 return ip;
373 }
374 }
375 }
376 throw new IllegalStateException("Unable to determine local ip");
377 }
378
379 private class HeartbeatMessageHandler implements MessageHandler {
tomb41d1ac2014-09-24 01:51:24 -0700380 @Override
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800381 public void handle(Message message) throws IOException {
382 HeartbeatMessage hb = SERIALIZER.decode(message.payload());
383 failureDetector.report(hb.source().id());
384 hb.knownPeers().forEach(node -> {
385 allNodes.put(node.id(), node);
386 });
tomb41d1ac2014-09-24 01:51:24 -0700387 }
tom2d7c65f2014-09-23 01:09:35 -0700388 }
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800389
390 private static class HeartbeatMessage {
391 private ControllerNode source;
392 private Set<ControllerNode> knownPeers;
393
394 public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
395 this.source = source;
396 this.knownPeers = ImmutableSet.copyOf(members);
397 }
398
399 public ControllerNode source() {
400 return source;
401 }
402
403 public Set<ControllerNode> knownPeers() {
404 return knownPeers;
405 }
406 }
407
Madan Jampani7d2fab22015-03-18 17:21:57 -0700408 @Override
409 public DateTime getLastUpdated(NodeId nodeId) {
410 return nodeStateLastUpdatedTimes.get(nodeId);
411 }
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700412
Jonathan Hart4a4d18f2015-03-26 12:16:16 -0700413}