blob: bcf4e2ef7819543264326e5a6fe066409fdd1a2c [file] [log] [blame]
Aaron Kruglikov29327982015-10-06 17:15:16 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.mlb;
18
19import com.google.common.util.concurrent.ListenableScheduledFuture;
20import com.google.common.util.concurrent.ListeningScheduledExecutorService;
21import com.google.common.util.concurrent.MoreExecutors;
22import org.apache.felix.scr.annotations.Activate;
23import org.apache.felix.scr.annotations.Component;
24import org.apache.felix.scr.annotations.Deactivate;
25import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipEvent;
29import org.onosproject.cluster.LeadershipEventListener;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.mastership.MastershipAdminService;
33import org.onosproject.mastership.MastershipEvent;
34import org.onosproject.mastership.MastershipListener;
35import org.onosproject.mastership.MastershipService;
36import org.slf4j.Logger;
37
38import java.util.concurrent.Executors;
39import java.util.concurrent.Future;
40import java.util.concurrent.TimeUnit;
41import java.util.concurrent.atomic.AtomicBoolean;
42import java.util.concurrent.atomic.AtomicReference;
43
44import static org.slf4j.LoggerFactory.getLogger;
45
46/**
47 * An app to perform automatic load balancing in response to events. Load balancing events are triggered by any
48 * change in mastership and are limited to a frequency of one every 30 seconds, all load balancing is run on an outside
49 * thread executor that must only have one thread due to issues that can occur is multiple balancing events occur in
50 * parallel.
51 */
52@Component(immediate = true)
53public class MastershipLoadBalancer {
54
55 private final Logger log = getLogger(getClass());
56
57 private static final String REBALANCE_MASTERSHIP = "rebalance/mastership";
58
59 private NodeId localId;
60
61 private AtomicBoolean isLeader = new AtomicBoolean(false);
62
63 private AtomicReference<Future> nextTask = new AtomicReference<>();
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 protected MastershipService mastershipService;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected MastershipAdminService mastershipAdminService;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected LeadershipService leadershipService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
77 private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
78
79 /* This listener is used to trigger balancing for any mastership event which will include switches changing state
80 between active and inactive states as well as the same variety of event occurring with ONOS nodes. Must
81 use a listenable executor to ensure events are triggered with no frequency greater than once every 30 seconds.
82 */
83 private InnerMastershipListener mastershipListener = new InnerMastershipListener();
84
85 //Ensures that all executions do not interfere with one another (single thread)
86 private ListeningScheduledExecutorService executorService = MoreExecutors.
87 listeningDecorator(Executors.newSingleThreadScheduledExecutor());
88
89 @Activate
90 public void activate() {
91 mastershipService.addListener(mastershipListener);
92 localId = clusterService.getLocalNode().id();
93 leadershipService.addListener(leadershipListener);
94 leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
95 log.info("Started");
96 }
97
98 @Deactivate
99 public void deactivate() {
100 mastershipService.removeListener(mastershipListener);
101 leadershipService.withdraw(REBALANCE_MASTERSHIP);
102 leadershipService.removeListener(leadershipListener);
103 cancelBalance();
104 executorService.shutdown();
105 log.info("Stopped");
106 }
107
108 private synchronized void processLeadershipChange(NodeId newLeader) {
109 if (newLeader == null) {
110 return;
111 }
112 boolean currLeader = newLeader.equals(localId);
113 if (isLeader.getAndSet(currLeader) != currLeader) {
114 if (currLeader) {
115 scheduleBalance();
116 } else {
117 cancelBalance();
118 }
119 }
120 }
121
122 private void scheduleBalance() {
123 if (isLeader.get() && nextTask.get() == null) {
124
125 ListenableScheduledFuture task = executorService.schedule(mastershipAdminService::balanceRoles, 30,
126 TimeUnit.SECONDS);
127 task.addListener(() -> {
128 log.info("Completed balance roles");
129 nextTask.set(null);
130 }, MoreExecutors.directExecutor()
131 );
132 if (!nextTask.compareAndSet(null, task)) {
133 task.cancel(false);
134 }
135 }
136 }
137
138 private void cancelBalance() {
139 Future task = nextTask.getAndSet(null);
140 if (task != null) {
141 task.cancel(false);
142 }
143 }
144
145 private class InnerMastershipListener implements MastershipListener {
146
147 @Override
148 public void event(MastershipEvent event) {
149 //Sets flag at execution to indicate there is currently a scheduled rebalancing, reverts upon completion
150 scheduleBalance();
151 }
152 }
153
154 private class InnerLeadershipListener implements LeadershipEventListener {
155 @Override
156 public boolean isRelevant(LeadershipEvent event) {
157 return REBALANCE_MASTERSHIP.equals(event.subject().topic());
158 }
159
160 @Override
161 public void event(LeadershipEvent event) {
162 processLeadershipChange(event.subject().leader());
163 }
164 }
165}