blob: 3bd3cefa55fe2d1c921394c477ac8d238bad1e15 [file] [log] [blame]
/*
* Copyright 2015-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.mlb;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.osgi.service.component.ComponentContext;
import org.onosproject.net.region.RegionEvent;
import org.onosproject.net.region.RegionListener;
import org.onosproject.net.region.RegionService;
import org.slf4j.Logger;
import java.util.Dictionary;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* An app to perform automatic load balancing in response to events. Load balancing events are triggered by any
* change in mastership and are limited to a frequency of one every 30 seconds, all load balancing is run on an outside
* thread executor that must only have one thread due to issues that can occur is multiple balancing events occur in
* parallel.
*/
@Component(immediate = true)
public class MastershipLoadBalancer {
private final Logger log = getLogger(getClass());
private static final int DEFAULT_SCHEDULE_PERIOD = 30;
@Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD,
label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.")
private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
private static final String REBALANCE_MASTERSHIP = "rebalance/mastership";
private NodeId localId;
private AtomicBoolean isLeader = new AtomicBoolean(false);
private AtomicReference<Future> nextTask = new AtomicReference<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipAdminService mastershipAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RegionService regionService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
/* This listener is used to trigger balancing for any mastership event
* which will include switches changing state between active and inactive
* states as well as the same variety of event occurring with ONOS nodes.
*/
private InnerMastershipListener mastershipListener = new InnerMastershipListener();
/* Used to trigger balancing on region events where there was either a
* change on the master sets of a given region or a change on the devices
* that belong to a region.
*/
private InnerRegionListener regionEventListener = new InnerRegionListener();
/* Ensures that all executions do not interfere with one another (single
* thread) and that they are apart from each other by at least what is
* defined as the schedulePeriod.
*/
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
groupedThreads("MastershipLoadBalancer", "%d", log));
@Activate
public void activate(ComponentContext context) {
cfgService.registerProperties(getClass());
modified(context);
mastershipService.addListener(mastershipListener);
localId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipListener);
leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
regionService.addListener(regionEventListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
cfgService.unregisterProperties(getClass(), false);
mastershipService.removeListener(mastershipListener);
leadershipService.withdraw(REBALANCE_MASTERSHIP);
leadershipService.removeListener(leadershipListener);
regionService.removeListener(regionEventListener);
cancelBalance();
executorService.shutdown();
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
readComponentConfiguration(context);
cancelBalance();
scheduleBalance();
log.info("modified");
}
private synchronized void processLeaderChange(NodeId newLeader) {
boolean currLeader = newLeader.equals(localId);
if (isLeader.getAndSet(currLeader) != currLeader) {
if (currLeader) {
scheduleBalance();
} else {
cancelBalance();
}
}
}
// Sets flag at execution to indicate there is currently a scheduled
// rebalancing. As soon as it starts running, the flag is set back to
// null and another rebalancing can be queued.
private void scheduleBalance() {
if (isLeader.get() && nextTask.get() == null) {
Future task = executorService.schedule(new BalanceTask(),
schedulePeriod, TimeUnit.SECONDS);
if (!nextTask.compareAndSet(null, task)) {
task.cancel(false);
}
}
}
private class BalanceTask implements Runnable {
@Override
public void run() {
// nextTask is now running, free the spot so that it is possible
// to queue up another upcoming task.
nextTask.set(null);
mastershipAdminService.balanceRoles();
log.info("Completed balance roles");
}
}
private void cancelBalance() {
Future task = nextTask.getAndSet(null);
if (task != null) {
task.cancel(false);
}
}
/**
* Extracts properties from the component configuration context.
*
* @param context the component context
*/
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
Integer newSchedulePeriod = Tools.getIntegerProperty(properties,
"schedulePeriod");
if (newSchedulePeriod == null) {
schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
log.info("Schedule period is not configured, default value is {}",
DEFAULT_SCHEDULE_PERIOD);
} else {
schedulePeriod = newSchedulePeriod;
log.info("Configured. Schedule period is configured to {}", schedulePeriod);
}
}
private class InnerMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
scheduleBalance();
}
}
private class InnerLeadershipListener implements LeadershipEventListener {
@Override
public boolean isRelevant(LeadershipEvent event) {
return REBALANCE_MASTERSHIP.equals(event.subject().topic());
}
@Override
public void event(LeadershipEvent event) {
processLeaderChange(event.subject().leaderNodeId());
}
}
private class InnerRegionListener implements RegionListener {
@Override
public void event(RegionEvent event) {
switch (event.type()) {
case REGION_MEMBERSHIP_CHANGED:
case REGION_UPDATED:
scheduleBalance();
break;
default:
break;
}
}
}
}