blob: ede60db15ac63c8fc4a21659afd14823fb7e25ca [file] [log] [blame]
Jonathan Hartd4be52f2017-05-25 14:21:44 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jonathan Hartd4be52f2017-05-25 14:21:44 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Milkey69ec8712017-08-08 13:00:43 -070017package org.onosproject.routeservice.impl;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070018
19import org.onosproject.cluster.ClusterEvent;
20import org.onosproject.cluster.ClusterEventListener;
21import org.onosproject.cluster.ClusterService;
Charles Chan3a35e142018-01-29 15:25:52 -080022import org.onosproject.cluster.ControllerNode;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070023import org.onosproject.cluster.NodeId;
Ray Milkey69ec8712017-08-08 13:00:43 -070024import org.onosproject.routeservice.ResolvedRoute;
25import org.onosproject.routeservice.Route;
26import org.onosproject.routeservice.RouteAdminService;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070027import org.onosproject.store.serializers.KryoNamespaces;
Charles Chan3a35e142018-01-29 15:25:52 -080028import org.onosproject.store.service.AsyncDistributedLock;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070029import org.onosproject.store.service.DistributedPrimitive;
30import org.onosproject.store.service.Serializer;
31import org.onosproject.store.service.StorageService;
32import org.onosproject.store.service.WorkQueue;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
Charles Chan3a35e142018-01-29 15:25:52 -080036import java.time.Duration;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070037import java.util.Collection;
38import java.util.concurrent.ScheduledExecutorService;
39import java.util.stream.Collectors;
40
41import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
42import static org.onlab.util.Tools.groupedThreads;
43
44/**
45 * Monitors cluster nodes and removes routes if a cluster node becomes unavailable.
46 */
47public class RouteMonitor {
48
49 private final Logger log = LoggerFactory.getLogger(this.getClass());
50
51 private static final String TOPIC = "route-reaper";
Charles Chan3a35e142018-01-29 15:25:52 -080052 private static final String LOCK_NAME = "route-monitor-lock";
Jonathan Hartd4be52f2017-05-25 14:21:44 -070053 private static final int NUM_PARALLEL_JOBS = 10;
54
55 private RouteAdminService routeService;
56 private final ClusterService clusterService;
57 private StorageService storageService;
58
Charles Chan3a35e142018-01-29 15:25:52 -080059 private final AsyncDistributedLock asyncLock;
60
Jonathan Hartd4be52f2017-05-25 14:21:44 -070061 private WorkQueue<NodeId> queue;
62
63 private final InternalClusterListener clusterListener = new InternalClusterListener();
64
65 private final ScheduledExecutorService reaperExecutor =
66 newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
67
68 /**
69 * Creates a new route monitor.
70 *
71 * @param routeService route service
72 * @param clusterService cluster service
73 * @param storageService storage service
74 */
75 public RouteMonitor(RouteAdminService routeService,
76 ClusterService clusterService, StorageService storageService) {
77 this.routeService = routeService;
78 this.clusterService = clusterService;
79 this.storageService = storageService;
80
Charles Chan3a35e142018-01-29 15:25:52 -080081 asyncLock = storageService.lockBuilder().withName(LOCK_NAME).build();
82
Jonathan Hartd4be52f2017-05-25 14:21:44 -070083 clusterService.addListener(clusterListener);
84
85 queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API));
86 queue.addStatusChangeListener(this::statusChange);
87
88 startProcessing();
89 }
90
91 /**
92 * Shuts down the route monitor.
93 */
94 public void shutdown() {
95 stopProcessing();
96 clusterService.removeListener(clusterListener);
Charles Chan3a35e142018-01-29 15:25:52 -080097 asyncLock.unlock();
Jonathan Hartd4be52f2017-05-25 14:21:44 -070098 }
99
100 private void statusChange(DistributedPrimitive.Status status) {
101 switch (status) {
102 case ACTIVE:
103 startProcessing();
104 break;
105 case SUSPENDED:
106 stopProcessing();
107 break;
108 case INACTIVE:
109 default:
110 break;
111 }
112 }
113
114 private void startProcessing() {
115 queue.registerTaskProcessor(this::cleanRoutes, NUM_PARALLEL_JOBS, reaperExecutor);
116 }
117
118 private void stopProcessing() {
119 queue.stopProcessing();
120 }
121
122 private void cleanRoutes(NodeId node) {
123 log.info("Cleaning routes from unavailable node {}", node);
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700124 Collection<Route> routes = routeService.getRouteTables().stream()
125 .flatMap(id -> routeService.getRoutes(id).stream())
126 .flatMap(route -> route.allRoutes().stream())
127 .map(ResolvedRoute::route)
128 .filter(r -> r.sourceNode().equals(node))
129 .collect(Collectors.toList());
Charles Chan3a35e142018-01-29 15:25:52 -0800130 if (node.equals(clusterService.getLocalNode().id())) {
131 log.debug("Do not remove routes from local nodes {}", node);
132 return;
133 }
134
135 if (clusterService.getState(node) == ControllerNode.State.READY) {
136 log.debug("Do not remove routes from active nodes {}", node);
137 return;
138 }
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700139
140 log.debug("Withdrawing routes: {}", routes);
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700141 routeService.withdraw(routes);
142 }
143
144 private class InternalClusterListener implements ClusterEventListener {
145
146 @Override
147 public void event(ClusterEvent event) {
148 switch (event.type()) {
149 case INSTANCE_DEACTIVATED:
150 NodeId id = event.subject().id();
151 log.info("Node {} deactivated", id);
Charles Chan3a35e142018-01-29 15:25:52 -0800152
153 // DistributedLock is introduced to guarantee that minority nodes won't try to remove
154 // routes that originated from majority nodes.
155 // Adding 15 seconds retry for the leadership election to be completed.
156 asyncLock.tryLock(Duration.ofSeconds(15)).whenComplete((result, error) -> {
157 if (result != null && result.isPresent()) {
158 log.debug("Lock obtained. Put {} into removal queue", id);
159 queue.addOne(id);
160 asyncLock.unlock();
161 } else {
162 log.debug("Fail to obtain lock. Do not remove routes from {}", id);
163 }
164 });
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700165 break;
166 case INSTANCE_ADDED:
167 case INSTANCE_REMOVED:
168 case INSTANCE_ACTIVATED:
169 case INSTANCE_READY:
170 default:
171 break;
172 }
173 }
174 }
Jonathan Hartd4be52f2017-05-25 14:21:44 -0700175}