blob: 1519e248439f04f47c2ce429886691911ef7bb62 [file] [log] [blame]
Saritha80cb8542019-12-09 10:39:20 +05301/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.diagnosis.impl;
17
18import org.osgi.service.component.annotations.Activate;
19import org.osgi.service.component.annotations.Component;
20import org.osgi.service.component.annotations.Modified;
21import org.osgi.service.component.annotations.Deactivate;
22import org.osgi.service.component.runtime.ServiceComponentRuntime;
23import org.osgi.service.component.annotations.Reference;
24import org.osgi.service.component.annotations.ReferenceCardinality;
25import org.osgi.service.component.runtime.dto.ComponentConfigurationDTO;
26import org.osgi.service.component.runtime.dto.ComponentDescriptionDTO;
27import org.osgi.service.component.ComponentContext;
28import org.apache.karaf.bundle.core.BundleService;
29import org.apache.karaf.system.SystemService;
30import org.onlab.osgi.DefaultServiceDirectory;
31import org.onosproject.cfg.ComponentConfigService;
32import org.onosproject.cluster.ClusterAdminService;
33import org.onosproject.cluster.ControllerNode;
34import org.onosproject.cluster.NodeId;
35import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.ClusterMessage;
37import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
38import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.osgi.framework.Bundle;
40import org.osgi.framework.BundleContext;
41import org.osgi.framework.FrameworkUtil;
42import org.osgi.framework.Constants;
43import org.osgi.framework.wiring.FrameworkWiring;
44import org.slf4j.Logger;
45
46import java.util.Dictionary;
47import java.time.Instant;
48import java.time.temporal.ChronoUnit;
49import java.util.Arrays;
50import java.util.ArrayList;
51import java.util.List;
52import java.util.Objects;
53import java.util.Set;
54import java.util.concurrent.CompletableFuture;
55import java.util.concurrent.ExecutionException;
56import java.util.concurrent.ScheduledExecutorService;
57import java.util.concurrent.ScheduledFuture;
58import java.util.concurrent.TimeUnit;
59import java.util.stream.Collectors;
60
61import static com.google.common.collect.Lists.newArrayList;
62import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
63import static org.onlab.util.SharedExecutors.getPoolThreadExecutor;
64import static org.onlab.util.Tools.groupedThreads;
65import static org.slf4j.LoggerFactory.getLogger;
66
67import static org.onlab.util.Tools.getIntegerProperty;
68import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.INITIAL_POLL_DELAY_MINUTE;
69import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.POLL_FREQUENCY_MINUTE;
70import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_INITIAL_POLL_DELAY_MINUTE;
71import static org.onosproject.diagnosis.impl.OsgiPropertyConstants.DEFAULT_POLL_FREQUENCY_MINUTE;
72
73
74@Component(immediate = true,
75 property = {
76 INITIAL_POLL_DELAY_MINUTE + ":Integer=" + DEFAULT_INITIAL_POLL_DELAY_MINUTE,
77 POLL_FREQUENCY_MINUTE + ":Integer=" + DEFAULT_POLL_FREQUENCY_MINUTE,
78 })
79public class NodeDiagnosisManager {
80
81 private static final MessageSubject REBOOT_MSG = new MessageSubject("Node-diagnosis");
82
83 private static int initialPollDelayMinute = DEFAULT_INITIAL_POLL_DELAY_MINUTE;
84 private static int pollFrequencyMinute = DEFAULT_POLL_FREQUENCY_MINUTE;
85
86 private final Logger log = getLogger(getClass());
87 private ScheduledExecutorService metricsExecutor;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 private ServiceComponentRuntime scrService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 private BundleService bundleService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 private SystemService systemService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 private ClusterCommunicationService communicationService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected ComponentConfigService cfgService;
103
104 private ScheduledFuture<?> clusterNodeDiagnosisFuture;
105
106 private BundleContext bundleContext;
107 private ClusterAdminService caService;
108 private NodeId localNodeId;
109 private Set<NodeId> nodeIds;
110
111 private static final long TIMEOUT = 3000;
112
113 @Activate
114 public void activate() {
115 cfgService.registerProperties(getClass());
116 bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
117 getNodeId();
118 scheduleAppDiagnosisPolling();
119 scheduleClusterNodeDiagnosisPolling();
120 communicationService.addSubscriber(REBOOT_MSG, new InternalSampleCollector(),
121 getPoolThreadExecutor());
122 log.info("Started");
123 }
124
125 @Deactivate
126 public void deactivate() {
127 cfgService.unregisterProperties(getClass(), false);
128 communicationService.removeSubscriber(REBOOT_MSG);
129 metricsExecutor.shutdownNow();
130 clusterNodeDiagnosisFuture.cancel(true);
131 log.info("Stopped");
132 }
133
134 @Modified
135 public void modified(ComponentContext context) {
136 readComponentConfiguration(context);
137 log.info("modified");
138 }
139
140 /**
141 * Extracts properties from the component configuration context.
142 *
143 * @param context the component context
144 */
145 private void readComponentConfiguration(ComponentContext context) {
146
147 Dictionary<?, ?> properties = context.getProperties();
148 boolean changed = false;
149
150 int newPollFrequency = getNewPollFrequency(properties);
151 if (newPollFrequency != pollFrequencyMinute) {
152 pollFrequencyMinute = newPollFrequency;
153 changed = true;
154 }
155
156 int newPollDelay = getNewPollDelay(properties);
157 if (newPollDelay != pollFrequencyMinute) {
158 initialPollDelayMinute = newPollDelay;
159 changed = true;
160 }
161 log.info("Node Diagnosis properties are:" +
162 " initialPollDelayMinute: {}, pollFrequencyMinute: {}",
163 initialPollDelayMinute, pollFrequencyMinute);
164 if (changed) {
165 //stops the old scheduled task
166 this.clusterNodeDiagnosisFuture.cancel(true);
167 //schedules new task at the new polling rate
168 log.info("New Scheduler started with,Node Diagnosis properties:" +
169 " initialPollDelayMinute: {}, pollFrequencyMinute: {}",
170 initialPollDelayMinute, pollFrequencyMinute);
171 scheduleClusterNodeDiagnosisPolling();
172 }
173 }
174
175 private int getNewPollFrequency(Dictionary<?, ?> properties) {
176 int newPollFrequency;
177 try {
178 newPollFrequency = getIntegerProperty(properties, POLL_FREQUENCY_MINUTE);
179 } catch (NumberFormatException | ClassCastException e) {
180 newPollFrequency = DEFAULT_POLL_FREQUENCY_MINUTE;
181 }
182 return newPollFrequency;
183 }
184
185 private int getNewPollDelay(Dictionary<?, ?> properties) {
186 int newPollDelay;
187 try {
188 newPollDelay = getIntegerProperty(properties, INITIAL_POLL_DELAY_MINUTE);
189 } catch (NumberFormatException | ClassCastException e) {
190 newPollDelay = DEFAULT_INITIAL_POLL_DELAY_MINUTE;
191 }
192 return newPollDelay;
193 }
194
195 private List<Bundle> getAllBundles() {
196 return Arrays.asList(bundleContext.getBundles());
197 }
198
199 private void getNodeId() {
200 caService = DefaultServiceDirectory.getService(ClusterAdminService.class);
201 if (Objects.isNull(caService)) {
202 return;
203 }
204 List<ControllerNode> controllerNodes = newArrayList(caService.getNodes());
205 nodeIds = controllerNodes
206 .stream()
207 .map(ControllerNode::id)
208 .collect(Collectors.toSet());
209
210 localNodeId = caService.getLocalNode().id();
211 }
212
213 private void scheduleAppDiagnosisPolling() {
214 metricsExecutor = newSingleThreadScheduledExecutor(
215 groupedThreads("Nodediagnosis/diagnosisThread",
216 "Nodediagnosis-executor-%d", log));
217 metricsExecutor.scheduleAtFixedRate(this::appDiagnosis,
218 60,
219 30, TimeUnit.SECONDS);
220 }
221
222 private void scheduleClusterNodeDiagnosisPolling() {
223 clusterNodeDiagnosisFuture = metricsExecutor.scheduleAtFixedRate(this::clusterNodeDiagnosis,
224 initialPollDelayMinute,
225 pollFrequencyMinute, TimeUnit.MINUTES);
226 }
227
228 private void appDiagnosis() {
229 verifyBundles(null);
230 verifyApps();
231 }
232
233 private void verifyBundles(String bundleName) {
234
235 if (Objects.isNull(bundleContext)) {
236 return;
237 }
238 try {
239 FrameworkWiring wiring = bundleContext.getBundle(Constants.SYSTEM_BUNDLE_LOCATION)
240 .adapt(FrameworkWiring.class);
241 if (Objects.isNull(wiring)) {
242 return;
243 }
244
245 boolean result;
246 List<Bundle> bundleList;
247 if (Objects.nonNull(bundleName)) {
248 log.info("bundle to be resolved and refreshed: {}", bundleName);
249 bundleList = this.getAllBundles().stream()
250 .filter(bundle -> bundleService.getInfo(bundle).getName().equals(bundleName))
251 .collect(Collectors.toList());
252 } else {
253 bundleList = this.getAllBundles().stream()
254 .filter(bundle -> bundleService.getDiag(bundle).split("[\n|\r]").length > 1)
255 .collect(Collectors.toList());
256 }
257 /**
258 * Example diags :
259 * BundleName:onos-providers-openflow-flow,
260 * Diag:Declarative Services
261 * ,number of lines of diag:1
262 * BundleName:onos-apps-faultmanagement-fmgui,
263 * Diag:Declarative Services
264 * org.onosproject.faultmanagement.alarms.gui.AlarmTableComponent (136)
265 * missing references: uiExtensionService
266 * org.onosproject.faultmanagement.alarms.gui.AlarmTopovComponent (137)
267 * missing references: uiExtensionService
268 * number of lines of diag:5
269 */
270 this.getAllBundles().forEach(
271 bundle -> {
272 log.debug("Bundle service - BundleName:{}, Diag:{}, number of lines of diag:{}",
273 bundleService.getInfo(bundle).getName(),
274 bundleService.getDiag(bundle),
275 bundleService.getDiag(bundle).split("[\n|\r]").length);
276 });
277
278
279 CompletableFuture<Boolean> completableBundles = CompletableFuture.supplyAsync(() -> {
280 Boolean isResolved = wiring.resolveBundles(bundleList);
281
282 wiring.refreshBundles(bundleList);
283 return isResolved;
284 });
285 result = completableBundles.get();
286
287 if (Objects.nonNull(bundleName)) {
288 log.info("bundle {} is in resolved State ? {}", bundleName, result ? "Yes" : "No");
289 } else {
290 log.info("All the bundles are in resolved State ? {}", result ? "Yes" : "No");
291 }
292 } catch (InterruptedException | ExecutionException e) {
293 log.error("exception occurred because of", e);
294 } catch (Exception e) {
295 log.error("Exception occured in Verifying Bundles", e);
296 }
297 }
298
299 private void verifyApps() {
300 log.debug("verifyApps() method invoked");
301 List<ComponentDescriptionDTO> nonActiveComponents = getNonActiveComponents();
302
303 nonActiveComponents.forEach(component -> {
304 try {
305 scrService.enableComponent(component).timeout(TIMEOUT);
306 } catch (Exception e) {
307 throw new IllegalStateException("Unable to start component " + component.name, e);
308 }
309 });
310 }
311
312 private List<ComponentDescriptionDTO> getNonActiveComponents() {
313 List<ComponentDescriptionDTO> nonActiveComponents = new ArrayList<>();
314 for (ComponentDescriptionDTO component : scrService.getComponentDescriptionDTOs()) {
315 if (scrService.isComponentEnabled(component)) {
316 for (ComponentConfigurationDTO config : scrService.getComponentConfigurationDTOs(component)) {
317 if (config.state != ComponentConfigurationDTO.ACTIVE) {
318 nonActiveComponents.add(component);
319 break;
320 }
321 }
322 }
323 }
324 return nonActiveComponents;
325 }
326
327 private void clusterNodeDiagnosis() {
328 if (Objects.isNull(caService)) {
329 return;
330 }
331
332 List<ControllerNode> nodes = newArrayList(caService.getNodes());
333 Set<NodeId> activeNodes = nodes
334 .stream()
335 .filter(node -> caService.getState(node.id()) == ControllerNode.State.ACTIVE)
336 .filter(node -> caService.getLastUpdatedInstant(node.id()).until(Instant.now(), ChronoUnit.MINUTES) > 4)
337 .map(ControllerNode::id)
338 .collect(Collectors.toSet());
339 boolean isNodesActive = nodes
340 .stream().filter(node -> !(caService.getState(node.id()) == ControllerNode.State.INACTIVE))
341 .allMatch(node -> caService.getState(node.id()) == ControllerNode.State.ACTIVE);
342 if (Objects.nonNull(activeNodes) && !activeNodes.isEmpty()) {
343 multicastReboot(isNodesActive, activeNodes);
344 }
345 }
346
347
348 public void restartNode() {
349 try {
350 systemService.reboot("now", SystemService.Swipe.CACHE);
351 } catch (Exception e) {
352 log.error("error occured because of {} ", e.getMessage());
353 }
354 }
355
356 private void multicastReboot(boolean removeDb, Set<NodeId> nodeIds) {
357 String data = "Reboot:" + removeDb;
358 communicationService.multicast(data, REBOOT_MSG, String::getBytes, nodeIds);
359 }
360
361 private class InternalSampleCollector implements ClusterMessageHandler {
362 @Override
363 public void handle(ClusterMessage message) {
364 String reqMsg = new String(message.payload());
365 log.info("Cluster communication message subject{} and message {}",
366 message.subject(), reqMsg);
367 boolean flag = Boolean.parseBoolean(reqMsg.split(":")[1].trim());
368 if (flag) {
369 System.setProperty("apache.karaf.removedb", "true");
370 }
371 if (message.subject().equals(REBOOT_MSG)) {
372 restartNode();
373 }
374 }
375 }
376}