blob: 6c194f774e650dbdac098b469f7068e0e9e25855 [file] [log] [blame]
Aaron Kruglikov29327982015-10-06 17:15:16 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Aaron Kruglikov29327982015-10-06 17:15:16 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.mlb;
18
19import com.google.common.util.concurrent.ListenableScheduledFuture;
20import com.google.common.util.concurrent.ListeningScheduledExecutorService;
21import com.google.common.util.concurrent.MoreExecutors;
Madan Jampani620f70d2016-01-30 22:22:47 -080022
Aaron Kruglikov29327982015-10-06 17:15:16 -070023import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.LeadershipEvent;
30import org.onosproject.cluster.LeadershipEventListener;
31import org.onosproject.cluster.LeadershipService;
32import org.onosproject.cluster.NodeId;
33import org.onosproject.mastership.MastershipAdminService;
34import org.onosproject.mastership.MastershipEvent;
35import org.onosproject.mastership.MastershipListener;
36import org.onosproject.mastership.MastershipService;
37import org.slf4j.Logger;
38
Aaron Kruglikov29327982015-10-06 17:15:16 -070039import java.util.concurrent.Future;
40import java.util.concurrent.TimeUnit;
41import java.util.concurrent.atomic.AtomicBoolean;
42import java.util.concurrent.atomic.AtomicReference;
43
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070044import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
45import static org.onlab.util.Tools.groupedThreads;
Aaron Kruglikov29327982015-10-06 17:15:16 -070046import static org.slf4j.LoggerFactory.getLogger;
47
48/**
49 * An app to perform automatic load balancing in response to events. Load balancing events are triggered by any
50 * change in mastership and are limited to a frequency of one every 30 seconds, all load balancing is run on an outside
51 * thread executor that must only have one thread due to issues that can occur is multiple balancing events occur in
52 * parallel.
53 */
54@Component(immediate = true)
55public class MastershipLoadBalancer {
56
57 private final Logger log = getLogger(getClass());
58
Simon Hunt89431db2016-03-08 15:02:43 -080059 // TODO: parameterize via component config
60 private static final int SCHEDULE_PERIOD = 5;
Aaron Kruglikov29327982015-10-06 17:15:16 -070061 private static final String REBALANCE_MASTERSHIP = "rebalance/mastership";
62
63 private NodeId localId;
64
65 private AtomicBoolean isLeader = new AtomicBoolean(false);
66
67 private AtomicReference<Future> nextTask = new AtomicReference<>();
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected MastershipService mastershipService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected MastershipAdminService mastershipAdminService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected LeadershipService leadershipService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected ClusterService clusterService;
80
81 private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
82
83 /* This listener is used to trigger balancing for any mastership event which will include switches changing state
84 between active and inactive states as well as the same variety of event occurring with ONOS nodes. Must
85 use a listenable executor to ensure events are triggered with no frequency greater than once every 30 seconds.
86 */
87 private InnerMastershipListener mastershipListener = new InnerMastershipListener();
88
89 //Ensures that all executions do not interfere with one another (single thread)
90 private ListeningScheduledExecutorService executorService = MoreExecutors.
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070091 listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log)));
Aaron Kruglikov29327982015-10-06 17:15:16 -070092
93 @Activate
94 public void activate() {
95 mastershipService.addListener(mastershipListener);
96 localId = clusterService.getLocalNode().id();
97 leadershipService.addListener(leadershipListener);
98 leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
99 log.info("Started");
100 }
101
102 @Deactivate
103 public void deactivate() {
104 mastershipService.removeListener(mastershipListener);
105 leadershipService.withdraw(REBALANCE_MASTERSHIP);
106 leadershipService.removeListener(leadershipListener);
107 cancelBalance();
108 executorService.shutdown();
109 log.info("Stopped");
110 }
111
Madan Jampani620f70d2016-01-30 22:22:47 -0800112 private synchronized void processLeaderChange(NodeId newLeader) {
Aaron Kruglikov29327982015-10-06 17:15:16 -0700113 boolean currLeader = newLeader.equals(localId);
114 if (isLeader.getAndSet(currLeader) != currLeader) {
115 if (currLeader) {
116 scheduleBalance();
117 } else {
118 cancelBalance();
119 }
120 }
121 }
122
123 private void scheduleBalance() {
124 if (isLeader.get() && nextTask.get() == null) {
125
Simon Hunt89431db2016-03-08 15:02:43 -0800126 ListenableScheduledFuture task =
127 executorService.schedule(mastershipAdminService::balanceRoles,
128 SCHEDULE_PERIOD, TimeUnit.SECONDS);
Aaron Kruglikov29327982015-10-06 17:15:16 -0700129 task.addListener(() -> {
130 log.info("Completed balance roles");
131 nextTask.set(null);
132 }, MoreExecutors.directExecutor()
133 );
134 if (!nextTask.compareAndSet(null, task)) {
135 task.cancel(false);
136 }
137 }
138 }
139
140 private void cancelBalance() {
141 Future task = nextTask.getAndSet(null);
142 if (task != null) {
143 task.cancel(false);
144 }
145 }
146
147 private class InnerMastershipListener implements MastershipListener {
148
149 @Override
150 public void event(MastershipEvent event) {
151 //Sets flag at execution to indicate there is currently a scheduled rebalancing, reverts upon completion
152 scheduleBalance();
153 }
154 }
155
156 private class InnerLeadershipListener implements LeadershipEventListener {
157 @Override
158 public boolean isRelevant(LeadershipEvent event) {
159 return REBALANCE_MASTERSHIP.equals(event.subject().topic());
160 }
161
162 @Override
163 public void event(LeadershipEvent event) {
Madan Jampani620f70d2016-01-30 22:22:47 -0800164 processLeaderChange(event.subject().leaderNodeId());
Aaron Kruglikov29327982015-10-06 17:15:16 -0700165 }
166 }
Madan Jampani620f70d2016-01-30 22:22:47 -0800167}