blob: c81079d2b12ffc2bfc45e285288417000044ddb7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.impl;
tom2d7c65f2014-09-23 01:09:35 -070017
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070018import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Maps;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070020import com.google.common.collect.Sets;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070021import com.hazelcast.util.AddressUtil;
Madan Jampanic26eede2015-04-16 11:42:16 -070022
tom2d7c65f2014-09-23 01:09:35 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
tom2d7c65f2014-09-23 01:09:35 -070026import org.apache.felix.scr.annotations.Service;
Madan Jampani7d2fab22015-03-18 17:21:57 -070027import org.joda.time.DateTime;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080028import org.onlab.netty.NettyMessagingService;
29import org.onlab.packet.IpAddress;
30import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.cluster.ClusterEvent;
32import org.onosproject.cluster.ClusterStore;
33import org.onosproject.cluster.ClusterStoreDelegate;
34import org.onosproject.cluster.ControllerNode;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080035import org.onosproject.cluster.ControllerNode.State;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070036import org.onosproject.cluster.DefaultControllerNode;
Brian O'Connorabafb502014-12-02 22:26:20 -080037import org.onosproject.cluster.NodeId;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080038import org.onosproject.store.AbstractStore;
Madan Jampanic26eede2015-04-16 11:42:16 -070039import org.onosproject.store.cluster.messaging.Endpoint;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070040import org.onosproject.store.consistent.impl.DatabaseDefinition;
41import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
Ayaka Koshibedd91b842015-03-02 14:48:47 -080042import org.onosproject.store.serializers.KryoNamespaces;
43import org.onosproject.store.serializers.KryoSerializer;
44import org.slf4j.Logger;
tom2d7c65f2014-09-23 01:09:35 -070045
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070046import java.io.File;
47import java.io.IOException;
48import java.net.InetAddress;
49import java.net.NetworkInterface;
50import java.net.SocketException;
51import java.util.Enumeration;
52import java.util.Map;
53import java.util.Set;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
57import java.util.concurrent.TimeUnit;
Madan Jampanic26eede2015-04-16 11:42:16 -070058import java.util.function.Consumer;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070059import java.util.stream.Collectors;
60
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070061import static com.google.common.base.Preconditions.checkNotNull;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070062import static com.hazelcast.util.AddressUtil.matchInterface;
Thomas Vachuska8dc1a692015-03-31 01:01:37 -070063import static java.net.NetworkInterface.getNetworkInterfaces;
64import static java.util.Collections.list;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070065import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskade563cf2015-04-01 00:28:50 -070066import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
67import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
Jonathan Hart4a4d18f2015-03-26 12:16:16 -070068import static org.slf4j.LoggerFactory.getLogger;
tom2d7c65f2014-09-23 01:09:35 -070069
tom2d7c65f2014-09-23 01:09:35 -070070@Component(immediate = true)
71@Service
Ayaka Koshibedd91b842015-03-02 14:48:47 -080072/**
73 * Distributed cluster nodes store that employs an accrual failure
74 * detector to identify cluster member up/down status.
75 */
tom0755a362014-09-24 11:54:43 -070076public class DistributedClusterStore
Ayaka Koshibedd91b842015-03-02 14:48:47 -080077 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
tomb41d1ac2014-09-24 01:51:24 -070078 implements ClusterStore {
tom2d7c65f2014-09-23 01:09:35 -070079
Thomas Vachuska8dc1a692015-03-31 01:01:37 -070080 private static final Logger log = getLogger(DistributedClusterStore.class);
tom2d7c65f2014-09-23 01:09:35 -070081
Thomas Vachuskade563cf2015-04-01 00:28:50 -070082 public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
83 public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
84
Ayaka Koshibedd91b842015-03-02 14:48:47 -080085 // TODO: make these configurable.
86 private static final int HEARTBEAT_FD_PORT = 2419;
87 private static final int HEARTBEAT_INTERVAL_MS = 100;
88 private static final int PHI_FAILURE_THRESHOLD = 10;
tom2d7c65f2014-09-23 01:09:35 -070089
Ayaka Koshibedd91b842015-03-02 14:48:47 -080090 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
91 @Override
92 protected void setupKryoPool() {
93 serializerPool = KryoNamespace.newBuilder()
Thomas Vachuska8dc1a692015-03-31 01:01:37 -070094 .register(KryoNamespaces.API)
95 .register(HeartbeatMessage.class)
96 .build()
97 .populate(1);
Ayaka Koshibedd91b842015-03-02 14:48:47 -080098 }
99 };
100
101 private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700102 private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
103 private static final String ONOS_NIC = "ONOS_NIC";
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800104
105 private ClusterDefinition clusterDefinition;
106
107 private Set<ControllerNode> seedNodes;
108 private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
109 private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
Madan Jampani7d2fab22015-03-18 17:21:57 -0700110 private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
Madan Jampanic26eede2015-04-16 11:42:16 -0700111 private NettyMessagingService messagingService;
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800112 private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
113 groupedThreads("onos/cluster/membership", "heartbeat-sender"));
114 private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
115 groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
116
117 private PhiAccrualFailureDetector failureDetector;
118
119 private ControllerNode localNode;
120
tom2d7c65f2014-09-23 01:09:35 -0700121 @Activate
122 public void activate() {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700123 File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700124 ClusterDefinitionStore clusterDefinitionStore =
125 new ClusterDefinitionStore(clusterDefinitionFile.getPath());
126
127 if (!clusterDefinitionFile.exists()) {
128 createDefaultClusterDefinition(clusterDefinitionStore);
129 }
tom2d7c65f2014-09-23 01:09:35 -0700130
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800131 try {
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700132 clusterDefinition = clusterDefinitionStore.read();
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800133 seedNodes = ImmutableSet
134 .copyOf(clusterDefinition.getNodes())
135 .stream()
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700136 .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
137 IpAddress.valueOf(n.getIp()),
138 n.getTcpPort()))
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800139 .collect(Collectors.toSet());
140 } catch (IOException e) {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700141 throw new IllegalStateException("Failed to read cluster definition.", e);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800142 }
tomb41d1ac2014-09-24 01:51:24 -0700143
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800144 seedNodes.forEach(node -> {
145 allNodes.put(node.id(), node);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700146 updateState(node.id(), State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800147 });
148
149 establishSelfIdentity();
150
151 messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800152 try {
153 messagingService.activate();
154 } catch (InterruptedException e) {
155 Thread.currentThread().interrupt();
156 throw new IllegalStateException(
157 "Failed to cleanly initialize membership and"
158 + " failure detector communication channel.", e);
159 }
160 messagingService.registerHandler(HEARTBEAT_MESSAGE,
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700161 new HeartbeatMessageHandler(), heartBeatMessageHandler);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800162
163 failureDetector = new PhiAccrualFailureDetector();
164
165 heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700166 HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
tomb41d1ac2014-09-24 01:51:24 -0700167
168 log.info("Started");
169 }
170
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700171 private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
172 // Assumes IPv4 is returned.
173 String ip = DistributedClusterStore.getSiteLocalAddress();
174 String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
175 NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
176 try {
177 store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
178 } catch (IOException e) {
179 log.warn("Unable to write default cluster definition", e);
180 }
181 }
182
183 /**
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700184 * Returns the address that matches the IP prefix given in ONOS_NIC
185 * environment variable if one was specified, or the first site local
186 * address if one can be found or the loopback address otherwise.
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700187 *
188 * @return site-local address in string form
189 */
190 public static String getSiteLocalAddress() {
191 try {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700192 String ipPrefix = System.getenv(ONOS_NIC);
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700193 for (NetworkInterface nif : list(getNetworkInterfaces())) {
194 for (InetAddress address : list(nif.getInetAddresses())) {
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700195 IpAddress ip = IpAddress.valueOf(address);
196 if (ipPrefix == null && address.isSiteLocalAddress() ||
197 ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
198 return ip.toString();
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700199 }
200 }
201 }
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700202
203 } catch (SocketException e) {
204 log.error("Unable to get network interfaces", e);
205 }
206
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700207 return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700208 }
209
tom2d7c65f2014-09-23 01:09:35 -0700210 @Deactivate
211 public void deactivate() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800212 try {
213 messagingService.deactivate();
214 } catch (Exception e) {
215 log.trace("Failed to cleanly shutdown cluster membership messaging", e);
216 }
217
218 heartBeatSender.shutdownNow();
219 heartBeatMessageHandler.shutdownNow();
220
tom2d7c65f2014-09-23 01:09:35 -0700221 log.info("Stopped");
222 }
223
224 @Override
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800225 public void setDelegate(ClusterStoreDelegate delegate) {
226 checkNotNull(delegate, "Delegate cannot be null");
227 this.delegate = delegate;
228 }
229
230 @Override
231 public void unsetDelegate(ClusterStoreDelegate delegate) {
232 this.delegate = null;
233 }
234
235 @Override
236 public boolean hasDelegate() {
237 return this.delegate != null;
238 }
239
240 @Override
tom2d7c65f2014-09-23 01:09:35 -0700241 public ControllerNode getLocalNode() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800242 return localNode;
tom2d7c65f2014-09-23 01:09:35 -0700243 }
244
245 @Override
246 public Set<ControllerNode> getNodes() {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800247 return ImmutableSet.copyOf(allNodes.values());
tom2d7c65f2014-09-23 01:09:35 -0700248 }
249
250 @Override
251 public ControllerNode getNode(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800252 checkNotNull(nodeId, INSTANCE_ID_NULL);
253 return allNodes.get(nodeId);
tom2d7c65f2014-09-23 01:09:35 -0700254 }
255
256 @Override
tomb41d1ac2014-09-24 01:51:24 -0700257 public State getState(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800258 checkNotNull(nodeId, INSTANCE_ID_NULL);
259 return nodeStates.get(nodeId);
tomb41d1ac2014-09-24 01:51:24 -0700260 }
261
262 @Override
Pavlin Radoslavov444b5192014-10-28 10:45:19 -0700263 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800264 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
265 allNodes.put(node.id(), node);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700266 updateState(nodeId, State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800267 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
268 return node;
tomee49c372014-09-26 15:14:50 -0700269 }
270
271 @Override
tomb41d1ac2014-09-24 01:51:24 -0700272 public void removeNode(NodeId nodeId) {
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800273 checkNotNull(nodeId, INSTANCE_ID_NULL);
274 ControllerNode node = allNodes.remove(nodeId);
275 if (node != null) {
276 nodeStates.remove(nodeId);
277 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_REMOVED, node));
tomb41d1ac2014-09-24 01:51:24 -0700278 }
279 }
280
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700281 @Override
282 public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
283 try {
284 Set<NodeInfo> infos = Sets.newHashSet();
285 nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
286 n.ip().toString(),
287 n.tcpPort())));
288
289 ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
290 new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
291
292 DatabaseDefinition ddef = DatabaseDefinition.from(infos);
293 new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
294 } catch (IOException e) {
295 log.error("Unable to form cluster", e);
296 }
297 }
298
Madan Jampani7d2fab22015-03-18 17:21:57 -0700299 private void updateState(NodeId nodeId, State newState) {
300 nodeStates.put(nodeId, newState);
301 nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
302 }
303
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800304 private void establishSelfIdentity() {
305 try {
306 IpAddress ip = findLocalIp();
307 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
308 allNodes.put(localNode.id(), localNode);
Madan Jampani7d2fab22015-03-18 17:21:57 -0700309 updateState(localNode.id(), State.ACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800310 log.info("Local Node: {}", localNode);
311 } catch (SocketException e) {
312 throw new IllegalStateException("Cannot determine local IP", e);
313 }
tom2d7c65f2014-09-23 01:09:35 -0700314 }
315
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800316 private void heartbeat() {
317 try {
318 Set<ControllerNode> peers = allNodes.values()
319 .stream()
320 .filter(node -> !(node.id().equals(localNode.id())))
321 .collect(Collectors.toSet());
322 byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, peers));
323 peers.forEach((node) -> {
324 heartbeatToPeer(hbMessagePayload, node);
325 State currentState = nodeStates.get(node.id());
326 double phi = failureDetector.phi(node.id());
327 if (phi >= PHI_FAILURE_THRESHOLD) {
328 if (currentState == State.ACTIVE) {
Madan Jampani7d2fab22015-03-18 17:21:57 -0700329 updateState(node.id(), State.INACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800330 notifyStateChange(node.id(), State.ACTIVE, State.INACTIVE);
331 }
332 } else {
333 if (currentState == State.INACTIVE) {
Madan Jampani7d2fab22015-03-18 17:21:57 -0700334 updateState(node.id(), State.ACTIVE);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800335 notifyStateChange(node.id(), State.INACTIVE, State.ACTIVE);
336 }
337 }
338 });
339 } catch (Exception e) {
340 log.debug("Failed to send heartbeat", e);
341 }
tomb41d1ac2014-09-24 01:51:24 -0700342 }
343
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800344 private void notifyStateChange(NodeId nodeId, State oldState, State newState) {
345 ControllerNode node = allNodes.get(nodeId);
346 if (newState == State.ACTIVE) {
347 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_ACTIVATED, node));
348 } else {
349 delegate.notify(new ClusterEvent(ClusterEvent.Type.INSTANCE_DEACTIVATED, node));
350 }
tomb41d1ac2014-09-24 01:51:24 -0700351 }
352
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800353 private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
354 Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
355 try {
356 messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
357 } catch (IOException e) {
Jonathan Hart4a4d18f2015-03-26 12:16:16 -0700358 log.trace("Sending heartbeat to {} failed", remoteEp, e);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800359 }
360 }
361
362 private IpAddress findLocalIp() throws SocketException {
363 Enumeration<NetworkInterface> interfaces =
364 NetworkInterface.getNetworkInterfaces();
365 while (interfaces.hasMoreElements()) {
366 NetworkInterface iface = interfaces.nextElement();
Thomas Vachuska8dc1a692015-03-31 01:01:37 -0700367 Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800368 while (inetAddresses.hasMoreElements()) {
369 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
370 if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
371 return ip;
372 }
373 }
374 }
375 throw new IllegalStateException("Unable to determine local ip");
376 }
377
Madan Jampanic26eede2015-04-16 11:42:16 -0700378 private class HeartbeatMessageHandler implements Consumer<byte[]> {
tomb41d1ac2014-09-24 01:51:24 -0700379 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700380 public void accept(byte[] message) {
381 HeartbeatMessage hb = SERIALIZER.decode(message);
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800382 failureDetector.report(hb.source().id());
383 hb.knownPeers().forEach(node -> {
384 allNodes.put(node.id(), node);
385 });
tomb41d1ac2014-09-24 01:51:24 -0700386 }
tom2d7c65f2014-09-23 01:09:35 -0700387 }
Ayaka Koshibedd91b842015-03-02 14:48:47 -0800388
389 private static class HeartbeatMessage {
390 private ControllerNode source;
391 private Set<ControllerNode> knownPeers;
392
393 public HeartbeatMessage(ControllerNode source, Set<ControllerNode> members) {
394 this.source = source;
395 this.knownPeers = ImmutableSet.copyOf(members);
396 }
397
398 public ControllerNode source() {
399 return source;
400 }
401
402 public Set<ControllerNode> knownPeers() {
403 return knownPeers;
404 }
405 }
406
Madan Jampani7d2fab22015-03-18 17:21:57 -0700407 @Override
408 public DateTime getLastUpdated(NodeId nodeId) {
409 return nodeStateLastUpdatedTimes.get(nodeId);
410 }
Thomas Vachuskade563cf2015-04-01 00:28:50 -0700411
Jonathan Hart4a4d18f2015-03-26 12:16:16 -0700412}