blob: d05670cbf758f4681ffc6cd6bbd783183b26796b [file] [log] [blame]
Yi Tsengf4e13e32017-03-30 15:38:39 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Yi Tsengf4e13e32017-03-30 15:38:39 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.vpls;
18
19import com.google.common.collect.Maps;
20import com.google.common.collect.Queues;
21import com.google.common.collect.Sets;
Yi Tsengf4e13e32017-03-30 15:38:39 -070022import org.onlab.util.Tools;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.LeadershipEvent;
25import org.onosproject.cluster.LeadershipEventListener;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
Yi Tsengf4e13e32017-03-30 15:38:39 -070030import org.onosproject.net.Host;
31import org.onosproject.net.host.HostService;
32import org.onosproject.net.intent.Intent;
33import org.onosproject.net.intent.IntentEvent;
34import org.onosproject.net.intent.IntentException;
35import org.onosproject.net.intent.IntentListener;
36import org.onosproject.net.intent.IntentService;
37import org.onosproject.net.intent.IntentUtils;
38import org.onosproject.net.intent.Key;
39import org.onosproject.net.intent.MultiPointToSinglePointIntent;
40import org.onosproject.net.intent.SinglePointToMultiPointIntent;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070041import org.onosproject.net.intf.Interface;
Yi Tsengf4e13e32017-03-30 15:38:39 -070042import org.onosproject.vpls.api.VplsData;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070043import org.onosproject.vpls.api.VplsOperation;
Yi Tsengf4e13e32017-03-30 15:38:39 -070044import org.onosproject.vpls.api.VplsOperationException;
45import org.onosproject.vpls.api.VplsOperationService;
Yi Tsengf4e13e32017-03-30 15:38:39 -070046import org.onosproject.vpls.api.VplsStore;
47import org.onosproject.vpls.intent.VplsIntentUtility;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070048import org.osgi.service.component.annotations.Activate;
49import org.osgi.service.component.annotations.Component;
50import org.osgi.service.component.annotations.Deactivate;
51import org.osgi.service.component.annotations.Reference;
52import org.osgi.service.component.annotations.ReferenceCardinality;
Yi Tsengf4e13e32017-03-30 15:38:39 -070053import org.slf4j.Logger;
54
55import java.util.Collection;
56import java.util.Deque;
57import java.util.Map;
58import java.util.Set;
59import java.util.concurrent.CompletableFuture;
60import java.util.concurrent.ExecutionException;
61import java.util.concurrent.ExecutorService;
62import java.util.concurrent.Executors;
63import java.util.concurrent.ScheduledExecutorService;
64import java.util.concurrent.TimeUnit;
65import java.util.concurrent.TimeoutException;
66import java.util.function.Consumer;
67import java.util.stream.Collectors;
68import java.util.stream.Stream;
69
70import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
71import static org.onlab.util.Tools.groupedThreads;
72import static org.slf4j.LoggerFactory.getLogger;
73
74/**
75 * An implementation of VplsOperationService.
76 * Handles the execution order of the VPLS operations generated by the
77 * application.
78 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070079@Component(immediate = true, service = VplsOperationService.class)
Yi Tsengf4e13e32017-03-30 15:38:39 -070080public class VplsOperationManager implements VplsOperationService {
81 private static final int NUM_THREADS = 4;
82
Ray Milkeyd84f89b2018-08-17 14:54:17 -070083 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tsengf4e13e32017-03-30 15:38:39 -070084 protected CoreService coreService;
85
Ray Milkeyd84f89b2018-08-17 14:54:17 -070086 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tsengf4e13e32017-03-30 15:38:39 -070087 protected IntentService intentService;
88
Ray Milkeyd84f89b2018-08-17 14:54:17 -070089 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tsengf4e13e32017-03-30 15:38:39 -070090 protected LeadershipService leadershipService;
91
Ray Milkeyd84f89b2018-08-17 14:54:17 -070092 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tsengf4e13e32017-03-30 15:38:39 -070093 protected ClusterService clusterService;
94
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tsengf4e13e32017-03-30 15:38:39 -070096 protected HostService hostService;
97
Ray Milkeyd84f89b2018-08-17 14:54:17 -070098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yi Tsengf4e13e32017-03-30 15:38:39 -070099 protected VplsStore vplsStore;
100
101 private final Logger log = getLogger(getClass());
102 protected Map<String, Deque<VplsOperation>> pendingVplsOperations;
103 protected final Map<String, VplsOperation> runningOperations = Maps.newHashMap();
104 protected ScheduledExecutorService schedulerExecutor;
105 protected ExecutorService workerExecutor;
106 protected ApplicationId appId;
107 protected boolean isLeader;
108 protected NodeId localNodeId;
109 protected LeadershipEventListener leadershipEventListener;
110
111 @Activate
112 public void activate() {
113 appId = coreService.registerApplication(VplsManager.VPLS_APP);
114 localNodeId = clusterService.getLocalNode().id();
115
116 leadershipEventListener = new InternalLeadershipListener();
117 leadershipService.addListener(leadershipEventListener);
118 leadershipService.runForLeadership(appId.name());
119 pendingVplsOperations = Maps.newConcurrentMap();
120
121 // Thread pool for VplsOperationExecutor
122 workerExecutor = newFixedThreadPool(NUM_THREADS,
123 groupedThreads("onos/apps/vpls",
124 "worker-%d",
125 log));
126 // A single thread pool for VplsOperationScheduler
127 schedulerExecutor = Executors.newScheduledThreadPool(1,
128 groupedThreads("onos/apps/vpls",
129 "scheduler-%d",
130 log));
131 // Start the scheduler
132 schedulerExecutor.scheduleAtFixedRate(new VplsOperationScheduler(),
133 0,
134 500,
135 TimeUnit.MILLISECONDS);
136
137 }
138
139 @Deactivate
140 public void deactivate() {
141 pendingVplsOperations.clear();
142 runningOperations.clear();
143 leadershipService.removeListener(leadershipEventListener);
144 schedulerExecutor.shutdown();
145 workerExecutor.shutdown();
146
147 // remove all intents from VPLS application when deactivated
148 Tools.stream(intentService.getIntents())
149 .filter(intent -> intent.appId().equals(appId))
150 .forEach(intentService::withdraw);
151 }
152
153 @Override
154 public void submit(VplsOperation vplsOperation) {
155 if (isLeader) {
156 // Only leader can execute operation
157 addVplsOperation(vplsOperation);
158 }
159 }
160
161 /**
162 * Adds a VPLS operation to the queue of pending operations.
163 *
164 * @param vplsOperation the VPLS operation to add
165 */
166 private void addVplsOperation(VplsOperation vplsOperation) {
167 VplsData vplsData = vplsOperation.vpls();
168 pendingVplsOperations.compute(vplsData.name(), (name, opQueue) -> {
169 opQueue = opQueue == null ? Queues.newArrayDeque() : opQueue;
170
171 // If the operation already exist in queue, ignore it.
172 if (opQueue.contains(vplsOperation)) {
173 return opQueue;
174 }
175 opQueue.add(vplsOperation);
176 return opQueue;
177 });
178 }
179
180 /**
181 * Optimizes the VPLS operation queue and return a single VPLS operation to
182 * execute.
183 *
184 * @param operations the queue to be optimized
185 * @return optimized VPLS operation from the queue
186 */
187 protected static VplsOperation getOptimizedVplsOperation(Deque<VplsOperation> operations) {
188 if (operations.isEmpty()) {
189 return null;
190 }
191 // no need to optimize if the queue contains only one operation
192 if (operations.size() == 1) {
193 return operations.getFirst();
194 }
195
196 final VplsOperation firstOperation = operations.peekFirst();
197 final VplsOperation lastOperation = operations.peekLast();
198 final VplsOperation.Operation firstOp = firstOperation.op();
199 final VplsOperation.Operation lastOp = lastOperation.op();
200
201 if (firstOp.equals(VplsOperation.Operation.REMOVE)) {
202 if (lastOp.equals(VplsOperation.Operation.REMOVE)) {
203 // case 1: both first and last operation are REMOVE; do remove
204 return firstOperation;
205 } else if (lastOp.equals(VplsOperation.Operation.ADD)) {
206 // case 2: if first is REMOVE, and last is ADD; do update
207 return VplsOperation.of(lastOperation.vpls(),
208 VplsOperation.Operation.UPDATE);
209 } else {
210 // case 3: first is REMOVE, last is UPDATE; do update
211 return lastOperation;
212 }
213 } else if (firstOp.equals(VplsOperation.Operation.ADD)) {
214 if (lastOp.equals(VplsOperation.Operation.REMOVE)) {
215 // case 4: first is ADD, last is REMOVE; nothing to do
216 return null;
217 } else if (lastOp.equals(VplsOperation.Operation.ADD)) {
218 // case 5: both first and last are ADD, do add
219 return VplsOperation.of(lastOperation.vpls(),
220 VplsOperation.Operation.ADD);
221 } else {
222 // case 6: first is ADD and last is update, do add
223 return VplsOperation.of(lastOperation.vpls(),
224 VplsOperation.Operation.ADD);
225 }
226 } else {
227 if (lastOp.equals(VplsOperation.Operation.REMOVE)) {
228 // case 7: last is remove, do remove
229 return lastOperation;
230 } else if (lastOp.equals(VplsOperation.Operation.ADD)) {
231 // case 8: do update only
232 return VplsOperation.of(lastOperation.vpls(),
233 VplsOperation.Operation.UPDATE);
234 } else {
235 // case 9: from UPDATE to UPDATE
236 // only need last UPDATE operation
237 return VplsOperation.of(lastOperation.vpls(),
238 VplsOperation.Operation.UPDATE);
239 }
240 }
241 }
242
243 /**
244 * Scheduler for VPLS operation.
245 * Processes a batch of VPLS operations in a period.
246 */
247 class VplsOperationScheduler implements Runnable {
248 private static final String UNKNOWN_STATE =
249 "Unknown state {} for success consumer";
250 private static final String OP_EXEC_ERR =
251 "Error when executing VPLS operation {}, error: {}";
252
253 /**
254 * Process a batch of VPLS operations.
255 */
256 @Override
257 public void run() {
258 Set<String> vplsNames = pendingVplsOperations.keySet();
259 vplsNames.forEach(vplsName -> {
260 VplsOperation operation;
261 synchronized (runningOperations) {
262 // Only one operation for a VPLS at the same time
263 if (runningOperations.containsKey(vplsName)) {
264 return;
265 }
266 Deque<VplsOperation> operations = pendingVplsOperations.remove(vplsName);
267 operation = getOptimizedVplsOperation(operations);
268 if (operation == null) {
269 // Nothing to do, this only happened when we add a VPLS
270 // and remove it before batch operations been processed.
271 return;
272 }
273 runningOperations.put(vplsName, operation);
274 }
275
276 VplsOperationExecutor operationExecutor =
277 new VplsOperationExecutor(operation);
278 operationExecutor.setConsumers(
279 (vplsOperation) -> {
280 // Success consumer
281 VplsData vplsData = vplsOperation.vpls();
282 log.debug("VPLS operation success: {}", vplsOperation);
283 switch (vplsData.state()) {
284 case ADDING:
285 case UPDATING:
286 vplsData.state(VplsData.VplsState.ADDED);
287 vplsStore.updateVpls(vplsData);
288 break;
289 case REMOVING:
290 // The VPLS information does not exists in
291 // store. No need to update the store.
292 break;
293 default:
294 log.warn(UNKNOWN_STATE, vplsData.state());
295 vplsData.state(VplsData.VplsState.FAILED);
296 vplsStore.updateVpls(vplsData);
297 break;
298 }
299 runningOperations.remove(vplsName);
300 },
301 (vplsOperationException) -> {
302 // Error consumer
303 VplsOperation vplsOperation =
304 vplsOperationException.vplsOperation();
Yi Tsengada511f2017-05-18 14:46:04 -0700305 log.warn(OP_EXEC_ERR,
306 vplsOperation.toString(),
307 vplsOperationException.getMessage());
Yi Tsengf4e13e32017-03-30 15:38:39 -0700308 VplsData vplsData = vplsOperation.vpls();
309 vplsData.state(VplsData.VplsState.FAILED);
310 vplsStore.updateVpls(vplsData);
Yi Tsengf4e13e32017-03-30 15:38:39 -0700311 runningOperations.remove(vplsName);
312 });
313 log.debug("Applying operation: {}", operation);
314 workerExecutor.execute(operationExecutor);
315 });
316 }
317 }
318
319 /**
320 * Direction for Intent installation.
321 */
322 private enum Direction {
323 ADD,
324 REMOVE
325 }
326
327 /**
328 * VPLS operation executor.
329 * Installs, updates or removes Intents according to the given VPLS operation.
330 */
331 class VplsOperationExecutor implements Runnable {
332 private static final String UNKNOWN_OP = "Unknown operation.";
333 private static final String UNKNOWN_INTENT_DIR = "Unknown Intent install direction.";
334 private static final int OPERATION_TIMEOUT = 10;
335 private VplsOperation vplsOperation;
336 private Consumer<VplsOperation> successConsumer;
337 private Consumer<VplsOperationException> errorConsumer;
338 private VplsOperationException error;
339
340 public VplsOperationExecutor(VplsOperation vplsOperation) {
341 this.vplsOperation = vplsOperation;
342 this.error = null;
343 }
344
345 /**
346 * Sets success consumer and error consumer for this executor.
347 *
348 * @param successConsumer the success consumer
349 * @param errorConsumer the error consumer
350 */
351 public void setConsumers(Consumer<VplsOperation> successConsumer,
352 Consumer<VplsOperationException> errorConsumer) {
353 this.successConsumer = successConsumer;
354 this.errorConsumer = errorConsumer;
355
356 }
357
358 @Override
359 public void run() {
360 switch (vplsOperation.op()) {
361 case ADD:
362 installVplsIntents();
363 break;
364 case REMOVE:
365 removeVplsIntents();
366 break;
367 case UPDATE:
368 updateVplsIntents();
369 break;
370 default:
371 this.error = new VplsOperationException(vplsOperation,
372 UNKNOWN_OP);
373 break;
374 }
375
376 if (this.error != null) {
377 errorConsumer.accept(this.error);
378 } else {
379 successConsumer.accept(vplsOperation);
380 }
381 }
382
383 /**
384 * Updates Intents of the VPLS.
385 */
386 private void updateVplsIntents() {
387 // check which part we need to update
388 // if we update host only, we don't need to reinstall
389 // every Intents
390 Set<Intent> intentsToInstall = Sets.newHashSet();
391 Set<Intent> intentsToUninstall = Sets.newHashSet();
392 VplsData vplsData = vplsOperation.vpls();
393 Set<Intent> currentIntents = getCurrentIntents();
394
395 // Compares broadcast Intents
396 Set<Intent> currentBrcIntents = currentIntents.stream()
397 .filter(intent -> intent instanceof SinglePointToMultiPointIntent)
398 .collect(Collectors.toSet());
399 Set<Intent> targetBrcIntents = VplsIntentUtility.buildBrcIntents(vplsData, appId);
400 if (!intentSetEquals(currentBrcIntents, targetBrcIntents)) {
401 // If broadcast Intents changes, it means some network
402 // interfaces or encapsulation constraint changed; Need to
403 // reinstall all intents
404 removeVplsIntents();
405 installVplsIntents();
406 return;
407 }
408
409 // Compares unicast Intents
410 Set<Intent> currentUniIntents = currentIntents.stream()
411 .filter(intent -> intent instanceof MultiPointToSinglePointIntent)
412 .collect(Collectors.toSet());
413 Set<Intent> targetUniIntents = VplsIntentUtility.buildUniIntents(vplsData,
414 hostsFromVpls(),
415 appId);
416
417 // New unicast Intents to install
418 targetUniIntents.forEach(intent -> {
419 if (!currentUniIntents.contains(intent)) {
420 intentsToInstall.add(intent);
421 }
422 });
423
424 // Old unicast Intents to remove
425 currentUniIntents.forEach(intent -> {
426 if (!targetUniIntents.contains(intent)) {
427 intentsToUninstall.add(intent);
428 }
429 });
430 applyIntentsSync(intentsToUninstall, Direction.REMOVE);
431 applyIntentsSync(intentsToInstall, Direction.ADD);
432 }
433
434 private Set<Host> hostsFromVpls() {
435 VplsData vplsData = vplsOperation.vpls();
436 Set<Interface> interfaces = vplsData.interfaces();
437 return interfaces.stream()
438 .map(this::hostsFromInterface)
439 .flatMap(Collection::stream)
440 .collect(Collectors.toSet());
441 }
442
443 private Set<Host> hostsFromInterface(Interface iface) {
444 return hostService.getConnectedHosts(iface.connectPoint())
445 .stream()
446 .filter(host -> host.vlan().equals(iface.vlan()))
447 .collect(Collectors.toSet());
448 }
449
450 /**
451 * Applies Intents synchronously with a specific direction.
452 *
453 * @param intents the Intents
454 * @param direction the direction
455 */
456 private void applyIntentsSync(Set<Intent> intents, Direction direction) {
457 Set<Key> pendingIntentKeys = intents.stream()
458 .map(Intent::key).collect(Collectors.toSet());
459 IntentCompleter completer;
460
461 switch (direction) {
462 case ADD:
463 completer = new IntentCompleter(pendingIntentKeys,
464 IntentEvent.Type.INSTALLED);
465 intentService.addListener(completer);
466 intents.forEach(intentService::submit);
467 break;
468 case REMOVE:
469 completer = new IntentCompleter(pendingIntentKeys,
470 IntentEvent.Type.WITHDRAWN);
471 intentService.addListener(completer);
472 intents.forEach(intentService::withdraw);
473 break;
474 default:
475 this.error = new VplsOperationException(this.vplsOperation,
476 UNKNOWN_INTENT_DIR);
477 return;
478 }
479
480 try {
481 // Wait until Intent operation completed
482 completer.complete();
483 } catch (VplsOperationException e) {
484 this.error = e;
485 } finally {
486 intentService.removeListener(completer);
487 }
488 }
489
490 /**
491 * Checks if two sets of Intents are equal.
492 *
493 * @param intentSet1 the first set of Intents
494 * @param intentSet2 the second set of Intents
495 * @return true if both set of Intents are equal; otherwise false
496 */
497 private boolean intentSetEquals(Set<Intent> intentSet1, Set<Intent> intentSet2) {
498 if (intentSet1.size() != intentSet2.size()) {
499 return false;
500 }
501 for (Intent intent1 : intentSet1) {
502 if (intentSet2.stream()
503 .noneMatch(intent2 -> IntentUtils.intentsAreEqual(intent1, intent2))) {
504 return false;
505 }
506 }
507 return true;
508 }
509
510 /**
511 * Retrieves installed Intents from IntentService which related to
512 * specific VPLS.
513 *
514 * @return the Intents which related to the VPLS
515 */
516 private Set<Intent> getCurrentIntents() {
517 VplsData vplsData = vplsOperation.vpls();
518 String vplsName = vplsData.name();
519 return Tools.stream(intentService.getIntents())
520 .filter(intent -> intent.key().toString().startsWith(vplsName))
521 .collect(Collectors.toSet());
522 }
523
524 /**
525 * Generates unicast Intents and broadcast Intents for the VPLS.
526 *
527 * @return Intents for the VPLS
528 */
529 private Set<Intent> generateVplsIntents() {
530 VplsData vplsData = vplsOperation.vpls();
531 Set<Intent> brcIntents = VplsIntentUtility.buildBrcIntents(vplsData, appId);
532 Set<Intent> uniIntent = VplsIntentUtility.buildUniIntents(vplsData, hostsFromVpls(), appId);
533
534 return Stream.concat(brcIntents.stream(), uniIntent.stream())
535 .collect(Collectors.toSet());
536 }
537
538 /**
539 * Removes all Intents from the VPLS.
540 */
541 private void removeVplsIntents() {
542 Set<Intent> intentsToWithdraw = getCurrentIntents();
543 applyIntentsSync(intentsToWithdraw, Direction.REMOVE);
Yi Tsengada511f2017-05-18 14:46:04 -0700544 intentsToWithdraw.forEach(intentService::purge);
Yi Tsengf4e13e32017-03-30 15:38:39 -0700545 }
546
547 /**
548 * Installs Intents of the VPLS.
549 */
550 private void installVplsIntents() {
551 Set<Intent> intentsToInstall = generateVplsIntents();
552 applyIntentsSync(intentsToInstall, Direction.ADD);
553 }
554
555 /**
556 * Helper class which monitors if all Intent operations are completed.
557 */
558 class IntentCompleter implements IntentListener {
559 private static final String INTENT_COMPILE_ERR = "Got {} from intent completer";
560 private CompletableFuture<Void> completableFuture;
561 private Set<Key> pendingIntentKeys;
562 private IntentEvent.Type expectedEventType;
563
564 /**
565 * Initialize completer with given Intent keys and expect Intent
566 * event type.
567 *
568 * @param pendingIntentKeys the Intent keys to wait
569 * @param expectedEventType expect Intent event type
570 */
Yi Tseng0ee6aa22017-05-19 14:43:27 -0700571 public IntentCompleter(Set<Key> pendingIntentKeys,
572 IntentEvent.Type expectedEventType) {
Yi Tsengf4e13e32017-03-30 15:38:39 -0700573 this.completableFuture = new CompletableFuture<>();
574 this.pendingIntentKeys = Sets.newConcurrentHashSet(pendingIntentKeys);
575 this.expectedEventType = expectedEventType;
576 }
577
578 @Override
579 public void event(IntentEvent event) {
580 Intent intent = event.subject();
Yi Tseng0ee6aa22017-05-19 14:43:27 -0700581 Key key = intent.key();
582 if (!pendingIntentKeys.contains(key)) {
583 // ignore Intent events from other VPLS
584 return;
585 }
Yi Tsengf4e13e32017-03-30 15:38:39 -0700586 // Intent failed, throw an exception to completable future
587 if (event.type() == IntentEvent.Type.CORRUPT ||
588 event.type() == IntentEvent.Type.FAILED) {
589 completableFuture.completeExceptionally(new IntentException(intent.toString()));
590 return;
591 }
592 // If event type matched to expected type, remove from pending
593 if (event.type() == expectedEventType) {
Yi Tsengf4e13e32017-03-30 15:38:39 -0700594 pendingIntentKeys.remove(key);
595 }
596 if (pendingIntentKeys.isEmpty()) {
597 completableFuture.complete(null);
598 }
599 }
600
601 /**
602 * Waits until all pending Intents completed ot timeout.
603 */
604 public void complete() {
605 // If no pending Intent keys, complete directly
606 if (pendingIntentKeys.isEmpty()) {
607 return;
608 }
609 try {
610 completableFuture.get(OPERATION_TIMEOUT, TimeUnit.SECONDS);
611 } catch (TimeoutException | InterruptedException |
612 ExecutionException | IntentException e) {
613 // TODO: handle errors more carefully
614 log.warn(INTENT_COMPILE_ERR, e.toString());
615 throw new VplsOperationException(vplsOperation, e.toString());
616 }
617 }
618 }
619 }
620
621 /**
622 * A listener for leadership events.
623 * Only the leader can process VPLS operation in the ONOS cluster.
624 */
625 private class InternalLeadershipListener implements LeadershipEventListener {
626 private static final String LEADER_CHANGE = "Change leader to {}";
627
628 @Override
629 public void event(LeadershipEvent event) {
630 switch (event.type()) {
631 case LEADER_CHANGED:
632 case LEADER_AND_CANDIDATES_CHANGED:
633 isLeader = localNodeId.equals(event.subject().leaderNodeId());
634 if (isLeader) {
635 log.debug(LEADER_CHANGE, localNodeId);
636 }
637 break;
638 default:
639 break;
640 }
641 }
642
643 @Override
644 public boolean isRelevant(LeadershipEvent event) {
645 return event.subject().topic().equals(appId.name());
646 }
647 }
648}