blob: 0dfa0b6408025f56c108372634d589dde9e8c761 [file] [log] [blame]
Aaron Kruglikov29327982015-10-06 17:15:16 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Aaron Kruglikov29327982015-10-06 17:15:16 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.mlb;
18
19import com.google.common.util.concurrent.ListenableScheduledFuture;
20import com.google.common.util.concurrent.ListeningScheduledExecutorService;
21import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani620f70d2016-01-30 22:22:47 -080022
Aaron Kruglikov29327982015-10-06 17:15:16 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
sangyun-han55e17982016-08-03 15:45:38 +090026import org.apache.felix.scr.annotations.Modified;
27import org.apache.felix.scr.annotations.Property;
Aaron Kruglikov29327982015-10-06 17:15:16 -070028import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
sangyun-han55e17982016-08-03 15:45:38 +090030import org.onlab.util.Tools;
31import org.onosproject.cfg.ComponentConfigService;
Aaron Kruglikov29327982015-10-06 17:15:16 -070032import org.onosproject.cluster.ClusterService;
33import org.onosproject.cluster.LeadershipEvent;
34import org.onosproject.cluster.LeadershipEventListener;
35import org.onosproject.cluster.LeadershipService;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.mastership.MastershipAdminService;
38import org.onosproject.mastership.MastershipEvent;
39import org.onosproject.mastership.MastershipListener;
40import org.onosproject.mastership.MastershipService;
sangyun-han55e17982016-08-03 15:45:38 +090041import org.osgi.service.component.ComponentContext;
Aaron Kruglikov29327982015-10-06 17:15:16 -070042import org.slf4j.Logger;
43
sangyun-han55e17982016-08-03 15:45:38 +090044import java.util.Dictionary;
Aaron Kruglikov29327982015-10-06 17:15:16 -070045import java.util.concurrent.Future;
46import java.util.concurrent.TimeUnit;
47import java.util.concurrent.atomic.AtomicBoolean;
48import java.util.concurrent.atomic.AtomicReference;
49
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070050import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
51import static org.onlab.util.Tools.groupedThreads;
Aaron Kruglikov29327982015-10-06 17:15:16 -070052import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * An app to perform automatic load balancing in response to events. Load balancing events are triggered by any
56 * change in mastership and are limited to a frequency of one every 30 seconds, all load balancing is run on an outside
57 * thread executor that must only have one thread due to issues that can occur is multiple balancing events occur in
58 * parallel.
59 */
60@Component(immediate = true)
61public class MastershipLoadBalancer {
62
63 private final Logger log = getLogger(getClass());
64
sangyun-han55e17982016-08-03 15:45:38 +090065 private static final int DEFAULT_SCHEDULE_PERIOD = 5;
66 @Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD,
67 label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.")
68 private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
69
Aaron Kruglikov29327982015-10-06 17:15:16 -070070 private static final String REBALANCE_MASTERSHIP = "rebalance/mastership";
71
72 private NodeId localId;
73
74 private AtomicBoolean isLeader = new AtomicBoolean(false);
75
76 private AtomicReference<Future> nextTask = new AtomicReference<>();
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected MastershipService mastershipService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected MastershipAdminService mastershipAdminService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected LeadershipService leadershipService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterService clusterService;
89
sangyun-han55e17982016-08-03 15:45:38 +090090 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 protected ComponentConfigService cfgService;
92
Aaron Kruglikov29327982015-10-06 17:15:16 -070093 private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
94
95 /* This listener is used to trigger balancing for any mastership event which will include switches changing state
96 between active and inactive states as well as the same variety of event occurring with ONOS nodes. Must
97 use a listenable executor to ensure events are triggered with no frequency greater than once every 30 seconds.
98 */
99 private InnerMastershipListener mastershipListener = new InnerMastershipListener();
100
101 //Ensures that all executions do not interfere with one another (single thread)
102 private ListeningScheduledExecutorService executorService = MoreExecutors.
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700103 listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log)));
Aaron Kruglikov29327982015-10-06 17:15:16 -0700104
105 @Activate
sangyun-han55e17982016-08-03 15:45:38 +0900106 public void activate(ComponentContext context) {
107 cfgService.registerProperties(getClass());
108 modified(context);
Aaron Kruglikov29327982015-10-06 17:15:16 -0700109 mastershipService.addListener(mastershipListener);
110 localId = clusterService.getLocalNode().id();
111 leadershipService.addListener(leadershipListener);
112 leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
113 log.info("Started");
114 }
115
116 @Deactivate
117 public void deactivate() {
sangyun-han55e17982016-08-03 15:45:38 +0900118 cfgService.unregisterProperties(getClass(), false);
Aaron Kruglikov29327982015-10-06 17:15:16 -0700119 mastershipService.removeListener(mastershipListener);
120 leadershipService.withdraw(REBALANCE_MASTERSHIP);
121 leadershipService.removeListener(leadershipListener);
122 cancelBalance();
123 executorService.shutdown();
124 log.info("Stopped");
125 }
126
sangyun-han55e17982016-08-03 15:45:38 +0900127 @Modified
128 public void modified(ComponentContext context) {
129 readComponentConfiguration(context);
130 cancelBalance();
131 scheduleBalance();
132 log.info("modified");
133 }
134
Madan Jampani620f70d2016-01-30 22:22:47 -0800135 private synchronized void processLeaderChange(NodeId newLeader) {
Aaron Kruglikov29327982015-10-06 17:15:16 -0700136 boolean currLeader = newLeader.equals(localId);
137 if (isLeader.getAndSet(currLeader) != currLeader) {
138 if (currLeader) {
139 scheduleBalance();
140 } else {
141 cancelBalance();
142 }
143 }
144 }
145
146 private void scheduleBalance() {
147 if (isLeader.get() && nextTask.get() == null) {
148
Simon Hunt89431db2016-03-08 15:02:43 -0800149 ListenableScheduledFuture task =
150 executorService.schedule(mastershipAdminService::balanceRoles,
sangyun-han55e17982016-08-03 15:45:38 +0900151 schedulePeriod, TimeUnit.SECONDS);
Aaron Kruglikov29327982015-10-06 17:15:16 -0700152 task.addListener(() -> {
153 log.info("Completed balance roles");
154 nextTask.set(null);
155 }, MoreExecutors.directExecutor()
156 );
157 if (!nextTask.compareAndSet(null, task)) {
158 task.cancel(false);
159 }
160 }
161 }
162
163 private void cancelBalance() {
164 Future task = nextTask.getAndSet(null);
165 if (task != null) {
166 task.cancel(false);
167 }
168 }
169
sangyun-han55e17982016-08-03 15:45:38 +0900170 /**
171 * Extracts properties from the component configuration context.
172 *
173 * @param context the component context
174 */
175 private void readComponentConfiguration(ComponentContext context) {
176 Dictionary<?, ?> properties = context.getProperties();
177
178 Integer newSchedulePeriod = Tools.getIntegerProperty(properties,
179 "schedulePeriod");
180 if (newSchedulePeriod == null) {
181 schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
182 log.info("Schedule period is not configured, default value is {}",
183 DEFAULT_SCHEDULE_PERIOD);
184 } else {
185 schedulePeriod = newSchedulePeriod;
186 log.info("Configured. Schedule period is configured to {}", schedulePeriod);
187 }
188 }
189
Aaron Kruglikov29327982015-10-06 17:15:16 -0700190 private class InnerMastershipListener implements MastershipListener {
191
192 @Override
193 public void event(MastershipEvent event) {
194 //Sets flag at execution to indicate there is currently a scheduled rebalancing, reverts upon completion
195 scheduleBalance();
196 }
197 }
198
199 private class InnerLeadershipListener implements LeadershipEventListener {
200 @Override
201 public boolean isRelevant(LeadershipEvent event) {
202 return REBALANCE_MASTERSHIP.equals(event.subject().topic());
203 }
204
205 @Override
206 public void event(LeadershipEvent event) {
Madan Jampani620f70d2016-01-30 22:22:47 -0800207 processLeaderChange(event.subject().leaderNodeId());
Aaron Kruglikov29327982015-10-06 17:15:16 -0700208 }
209 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800210}