blob: 24d2aff3ec5264bffc5cd225ab7efcc0b9539153 [file] [log] [blame]
Jonathan Hartd4be52f2017-05-25 14:21:44 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jonathan Hartd4be52f2017-05-25 14:21:44 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Milkey69ec8712017-08-08 13:00:43 -070017package org.onosproject.routeservice.impl;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070018
19import org.onosproject.cluster.ClusterEvent;
20import org.onosproject.cluster.ClusterEventListener;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.NodeId;
Ray Milkey69ec8712017-08-08 13:00:43 -070023import org.onosproject.routeservice.ResolvedRoute;
24import org.onosproject.routeservice.Route;
25import org.onosproject.routeservice.RouteAdminService;
Jonathan Hartd4be52f2017-05-25 14:21:44 -070026import org.onosproject.store.serializers.KryoNamespaces;
27import org.onosproject.store.service.DistributedPrimitive;
28import org.onosproject.store.service.Serializer;
29import org.onosproject.store.service.StorageService;
30import org.onosproject.store.service.WorkQueue;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.util.Collection;
35import java.util.concurrent.ScheduledExecutorService;
36import java.util.stream.Collectors;
37
38import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
39import static org.onlab.util.Tools.groupedThreads;
40
41/**
42 * Monitors cluster nodes and removes routes if a cluster node becomes unavailable.
43 */
44public class RouteMonitor {
45
46 private final Logger log = LoggerFactory.getLogger(this.getClass());
47
48 private static final String TOPIC = "route-reaper";
49 private static final int NUM_PARALLEL_JOBS = 10;
50
51 private RouteAdminService routeService;
52 private final ClusterService clusterService;
53 private StorageService storageService;
54
55 private WorkQueue<NodeId> queue;
56
57 private final InternalClusterListener clusterListener = new InternalClusterListener();
58
59 private final ScheduledExecutorService reaperExecutor =
60 newSingleThreadScheduledExecutor(groupedThreads("route/reaper", "", log));
61
62 /**
63 * Creates a new route monitor.
64 *
65 * @param routeService route service
66 * @param clusterService cluster service
67 * @param storageService storage service
68 */
69 public RouteMonitor(RouteAdminService routeService,
70 ClusterService clusterService, StorageService storageService) {
71 this.routeService = routeService;
72 this.clusterService = clusterService;
73 this.storageService = storageService;
74
75 clusterService.addListener(clusterListener);
76
77 queue = storageService.getWorkQueue(TOPIC, Serializer.using(KryoNamespaces.API));
78 queue.addStatusChangeListener(this::statusChange);
79
80 startProcessing();
81 }
82
83 /**
84 * Shuts down the route monitor.
85 */
86 public void shutdown() {
87 stopProcessing();
88 clusterService.removeListener(clusterListener);
89 }
90
91 private void statusChange(DistributedPrimitive.Status status) {
92 switch (status) {
93 case ACTIVE:
94 startProcessing();
95 break;
96 case SUSPENDED:
97 stopProcessing();
98 break;
99 case INACTIVE:
100 default:
101 break;
102 }
103 }
104
105 private void startProcessing() {
106 queue.registerTaskProcessor(this::cleanRoutes, NUM_PARALLEL_JOBS, reaperExecutor);
107 }
108
109 private void stopProcessing() {
110 queue.stopProcessing();
111 }
112
113 private void cleanRoutes(NodeId node) {
114 log.info("Cleaning routes from unavailable node {}", node);
115
116 Collection<Route> routes = routeService.getRouteTables().stream()
117 .flatMap(id -> routeService.getRoutes(id).stream())
118 .flatMap(route -> route.allRoutes().stream())
119 .map(ResolvedRoute::route)
120 .filter(r -> r.sourceNode().equals(node))
121 .collect(Collectors.toList());
122
123 log.debug("Withdrawing routes: {}", routes);
124
125 routeService.withdraw(routes);
126 }
127
128 private class InternalClusterListener implements ClusterEventListener {
129
130 @Override
131 public void event(ClusterEvent event) {
132 switch (event.type()) {
133 case INSTANCE_DEACTIVATED:
134 NodeId id = event.subject().id();
135 log.info("Node {} deactivated", id);
136 queue.addOne(id);
137 break;
138 case INSTANCE_ADDED:
139 case INSTANCE_REMOVED:
140 case INSTANCE_ACTIVATED:
141 case INSTANCE_READY:
142 default:
143 break;
144 }
145 }
146 }
147
148}