blob: 1ee7daa4a0f5624533d29c82d54a3b4c13f10c3e [file] [log] [blame]
Jonathan Hartd4be52f2017-05-25 14:21:44 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jonathan Hartd4be52f2017-05-25 14:21:44 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Milkey69ec8712017-08-08 13:00:43 -070017package org.onosproject.routeservice.impl;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070018
19import org.onosproject.cluster.ClusterEvent;
20import org.onosproject.cluster.ClusterEventListener;
21import org.onosproject.cluster.ClusterService;
Charles Chan0cc44502018-01-29 15:25:52 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070023import org.onosproject.cluster.NodeId;
Ray Milkey69ec8712017-08-08 13:00:43 -070024import org.onosproject.routeservice.ResolvedRoute;
25import org.onosproject.routeservice.Route;
26import org.onosproject.routeservice.RouteAdminService;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070027import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan0cc44502018-01-29 15:25:52 -080028import org.onosproject.store.service.AsyncDistributedLock;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070029import org.onosproject.store.service.DistributedPrimitive;
30import org.onosproject.store.service.Serializer;
31import org.onosproject.store.service.StorageService;
32import org.onosproject.store.service.WorkQueue;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
Charles Chan0cc44502018-01-29 15:25:52 -080036import java.time.Duration;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070037import java.util.Collection;
pierventre52ef9332021-07-09 22:42:17 +020038import java.util.concurrent.ExecutorService;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070039import java.util.concurrent.ScheduledExecutorService;
40import java.util.stream.Collectors;
41
42import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
43import static org.onlab.util.Tools.groupedThreads;
44
45/**
46 * Monitors cluster nodes and removes routes if a cluster node becomes unavailable.
47 */
48public class RouteMonitor {
49
50 private final Logger log = LoggerFactory.getLogger(this.getClass());
51
52 private static final String TOPIC = "route-reaper";
Charles Chan0cc44502018-01-29 15:25:52 -080053 private static final String LOCK_NAME = "route-monitor-lock";
Jonathan Hartd4be52f2017-05-25 14:21:44 -070054 private static final int NUM_PARALLEL_JOBS = 10;
55
56 private RouteAdminService routeService;
57 private final ClusterService clusterService;
58 private StorageService storageService;
59
Charles Chan0cc44502018-01-29 15:25:52 -080060 private final AsyncDistributedLock asyncLock;
61
Jonathan Hartd4be52f2017-05-25 14:21:44 -070062 private WorkQueue<NodeId> queue;
63
64 private final InternalClusterListener clusterListener = new InternalClusterListener();
65
66 private final ScheduledExecutorService reaperExecutor =
67 newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
68
pierventre52ef9332021-07-09 22:42:17 +020069 private final ExecutorService eventExecutor = newSingleThreadScheduledExecutor(groupedThreads(
70 "onos/routemonitor", "events-%d", log));
71
Jonathan Hartd4be52f2017-05-25 14:21:44 -070072 /**
73 * Creates a new route monitor.
74 *
75 * @param routeService route service
76 * @param clusterService cluster service
77 * @param storageService storage service
78 */
79 public RouteMonitor(RouteAdminService routeService,
80 ClusterService clusterService, StorageService storageService) {
81 this.routeService = routeService;
82 this.clusterService = clusterService;
83 this.storageService = storageService;
84
Charles Chan0cc44502018-01-29 15:25:52 -080085 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
86
Jonathan Hartd4be52f2017-05-25 14:21:44 -070087 clusterService.addListener(clusterListener);
88
89 queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API));
90 queue.addStatusChangeListener(this::statusChange);
91
92 startProcessing();
93 }
94
95 /**
96 * Shuts down the route monitor.
97 */
98 public void shutdown() {
99 stopProcessing();
100 clusterService.removeListener(clusterListener);
pierventre52ef9332021-07-09 22:42:17 +0200101 eventExecutor.shutdownNow();
102 reaperExecutor.shutdownNow();
Charles Chan0cc44502018-01-29 15:25:52 -0800103 asyncLock.unlock();
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700104 }
105
106 private void statusChange(DistributedPrimitive.Status status) {
107 switch (status) {
108 case ACTIVE:
109 startProcessing();
110 break;
111 case SUSPENDED:
112 stopProcessing();
113 break;
114 case INACTIVE:
115 default:
116 break;
117 }
118 }
119
120 private void startProcessing() {
121 queue.registerTaskProcessor(this::cleanRoutes, NUM_PARALLEL_JOBS, reaperExecutor);
122 }
123
124 private void stopProcessing() {
125 queue.stopProcessing();
126 }
127
128 private void cleanRoutes(NodeId node) {
129 log.info("Cleaning routes from unavailable node {}", node);
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700130 Collection<Route> routes = routeService.getRouteTables().stream()
131 .flatMap(id -> routeService.getRoutes(id).stream())
132 .flatMap(route -> route.allRoutes().stream())
133 .map(ResolvedRoute::route)
134 .filter(r -> r.sourceNode().equals(node))
135 .collect(Collectors.toList());
Charles Chan0cc44502018-01-29 15:25:52 -0800136 if (node.equals(clusterService.getLocalNode().id())) {
137 log.debug("Do not remove routes from local nodes {}", node);
138 return;
139 }
140
141 if (clusterService.getState(node) == ControllerNode.State.READY) {
142 log.debug("Do not remove routes from active nodes {}", node);
143 return;
144 }
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700145
146 log.debug("Withdrawing routes: {}", routes);
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700147 routeService.withdraw(routes);
148 }
149
150 private class InternalClusterListener implements ClusterEventListener {
151
152 @Override
153 public void event(ClusterEvent event) {
pierventre52ef9332021-07-09 22:42:17 +0200154 eventExecutor.execute(() -> {
pierventree73a7272021-11-09 20:06:01 +0100155 if (event.instanceType() == ClusterEvent.InstanceType.STORAGE) {
156 log.debug("Skipping cluster event for {}", event.subject().id().id());
157 return;
158 }
159
pierventre52ef9332021-07-09 22:42:17 +0200160 switch (event.type()) {
161 case INSTANCE_DEACTIVATED:
162 NodeId id = event.subject().id();
163 log.info("Node {} deactivated", id);
Charles Chan0cc44502018-01-29 15:25:52 -0800164
pierventre52ef9332021-07-09 22:42:17 +0200165 // DistributedLock is introduced to guarantee that minority nodes won't try to remove
166 // routes that originated from majority nodes.
167 // Adding 15 seconds retry for the leadership election to be completed.
168 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
169 if (result != null && result.isPresent()) {
170 log.debug("Lock obtained. Put {} into removal queue", id);
171 queue.addOne(id);
172 asyncLock.unlock();
173 } else {
174 log.debug("Fail to obtain lock. Do not remove routes from {}", id);
175 }
176 });
177 break;
178 case INSTANCE_ADDED:
179 case INSTANCE_REMOVED:
180 case INSTANCE_ACTIVATED:
181 case INSTANCE_READY:
182 default:
183 break;
184 }
185 });
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700186 }
187 }
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700188}