blob: b341165c382f7d2591c38090be726fe669cd44df [file] [log] [blame]
Jonathan Hart74c83132015-02-02 18:37:57 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.intent.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
23import org.apache.felix.scr.annotations.Service;
Jonathan Hartf2fda812015-02-17 15:21:03 -080024import org.onosproject.cluster.ClusterEvent;
25import org.onosproject.cluster.ClusterEventListener;
Jonathan Hart74c83132015-02-02 18:37:57 -080026import org.onosproject.cluster.ClusterService;
Jonathan Hartf2fda812015-02-17 15:21:03 -080027import org.onosproject.cluster.ControllerNode;
Jonathan Hart74c83132015-02-02 18:37:57 -080028import org.onosproject.cluster.Leadership;
29import org.onosproject.cluster.LeadershipEvent;
30import org.onosproject.cluster.LeadershipEventListener;
31import org.onosproject.cluster.LeadershipService;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080032import org.onosproject.cluster.NodeId;
Jonathan Hart5ec32ba2015-02-05 13:33:58 -080033import org.onosproject.net.intent.Key;
Brian O'Connor87ba7a72015-03-11 14:40:09 -070034import org.onosproject.net.intent.PartitionService;
Jonathan Hart74c83132015-02-02 18:37:57 -080035import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Jonathan Hartdc9d7b82015-02-22 17:59:50 -080038import java.util.List;
Brian O'Connor5eb77c82015-03-02 18:09:39 -080039import java.util.Objects;
Jonathan Hartf2fda812015-02-17 15:21:03 -080040import java.util.concurrent.Executors;
41import java.util.concurrent.ScheduledExecutorService;
42import java.util.concurrent.TimeUnit;
Madan Jampani4732c1b2015-05-19 17:11:50 -070043import java.util.concurrent.atomic.AtomicBoolean;
Jonathan Hartdc9d7b82015-02-22 17:59:50 -080044import java.util.stream.Collectors;
Jonathan Hart74c83132015-02-02 18:37:57 -080045
Jonathan Hart74c83132015-02-02 18:37:57 -080046/**
47 * Manages the assignment of intent keyspace partitions to instances.
48 */
49@Component(immediate = true)
50@Service
51public class PartitionManager implements PartitionService {
52
53 private static final Logger log = LoggerFactory.getLogger(PartitionManager.class);
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 protected LeadershipService leadershipService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 protected ClusterService clusterService;
60
Madan Jampani4732c1b2015-05-19 17:11:50 -070061 protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
62
Jonathan Hart7061acd2015-03-04 13:15:32 -080063 static final int NUM_PARTITIONS = 14;
Jonathan Hartf2fda812015-02-17 15:21:03 -080064 private static final int BACKOFF_TIME = 2;
Madan Jampani4732c1b2015-05-19 17:11:50 -070065 private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
66 private static final int RETRY_AFTER_DELAY_SEC = 5;
Jonathan Hart74c83132015-02-02 18:37:57 -080067
68 private static final String ELECTION_PREFIX = "intent-partition-";
69
70 private LeadershipEventListener leaderListener = new InternalLeadershipListener();
Jonathan Hartf2fda812015-02-17 15:21:03 -080071 private ClusterEventListener clusterListener = new InternalClusterEventListener();
Jonathan Hart74c83132015-02-02 18:37:57 -080072
Jonathan Hartf2fda812015-02-17 15:21:03 -080073 private ScheduledExecutorService executor = Executors
74 .newScheduledThreadPool(1);
Jonathan Hart74c83132015-02-02 18:37:57 -080075
76 @Activate
77 public void activate() {
Jonathan Hart74c83132015-02-02 18:37:57 -080078 leadershipService.addListener(leaderListener);
Jonathan Hartf2fda812015-02-17 15:21:03 -080079 clusterService.addListener(clusterListener);
Jonathan Hart74c83132015-02-02 18:37:57 -080080
81 for (int i = 0; i < NUM_PARTITIONS; i++) {
Jonathan Hartf2fda812015-02-17 15:21:03 -080082 leadershipService.runForLeadership(getPartitionPath(i));
Jonathan Hart74c83132015-02-02 18:37:57 -080083 }
Jonathan Hartf2fda812015-02-17 15:21:03 -080084
Madan Jampani4732c1b2015-05-19 17:11:50 -070085 executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0,
86 CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS);
Jonathan Hart74c83132015-02-02 18:37:57 -080087 }
88
89 @Deactivate
90 public void deactivate() {
Jonathan Hartac48a952015-02-25 14:11:55 -080091 executor.shutdownNow();
92
Jonathan Hart74c83132015-02-02 18:37:57 -080093 leadershipService.removeListener(leaderListener);
Jonathan Hartf2fda812015-02-17 15:21:03 -080094 clusterService.removeListener(clusterListener);
95 }
96
Jonathan Hart7061acd2015-03-04 13:15:32 -080097 /**
98 * Sets the specified executor to be used for scheduling background tasks.
99 *
100 * @param executor scheduled executor service for background tasks
101 * @return this PartitionManager
102 */
103 public PartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
104 this.executor = executor;
105 return this;
106 }
107
Jonathan Hartf2fda812015-02-17 15:21:03 -0800108 private String getPartitionPath(int i) {
109 return ELECTION_PREFIX + i;
Jonathan Hart74c83132015-02-02 18:37:57 -0800110 }
111
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800112 private String getPartitionPath(PartitionId id) {
113 return getPartitionPath(id.value());
114 }
115
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800116 private PartitionId getPartitionForKey(Key intentKey) {
Brian O'Connor1fdfacd2015-02-18 20:52:06 -0800117 int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
118 //TODO investigate Guava consistent hash method
119 // ... does it add significant computational complexity? is it worth it?
120 //int partition = consistentHash(intentKey.hash(), NUM_PARTITIONS);
121 PartitionId id = new PartitionId(partition);
Brian O'Connor1fdfacd2015-02-18 20:52:06 -0800122 return id;
Jonathan Hart74c83132015-02-02 18:37:57 -0800123 }
124
125 @Override
Jonathan Hart5ec32ba2015-02-05 13:33:58 -0800126 public boolean isMine(Key intentKey) {
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800127 return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
128 clusterService.getLocalNode().id());
129 }
130
131 @Override
132 public NodeId getLeader(Key intentKey) {
133 return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
Jonathan Hart74c83132015-02-02 18:37:57 -0800134 }
135
Madan Jampani4732c1b2015-05-19 17:11:50 -0700136 protected void doRebalance() {
137 rebalanceScheduled.set(false);
Jonathan Hartf2fda812015-02-17 15:21:03 -0800138 try {
Madan Jampani4732c1b2015-05-19 17:11:50 -0700139 rebalance();
Jonathan Hartf2fda812015-02-17 15:21:03 -0800140 } catch (Exception e) {
Madan Jampani4732c1b2015-05-19 17:11:50 -0700141 log.warn("Exception caught during rebalance task. Will retry in " + RETRY_AFTER_DELAY_SEC + " seconds", e);
142 scheduleRebalance(RETRY_AFTER_DELAY_SEC);
Jonathan Hartf2fda812015-02-17 15:21:03 -0800143 }
144 }
145
Jonathan Hartf2fda812015-02-17 15:21:03 -0800146 /**
147 * Determine whether we have more than our fair share of partitions, and if
148 * so, relinquish leadership of some of them for a little while to let
149 * other instances take over.
150 */
Madan Jampani4732c1b2015-05-19 17:11:50 -0700151 private void rebalance() {
Jonathan Hartf2fda812015-02-17 15:21:03 -0800152 int activeNodes = (int) clusterService.getNodes()
153 .stream()
Madan Jampani4732c1b2015-05-19 17:11:50 -0700154 .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id()))
Jonathan Hartf2fda812015-02-17 15:21:03 -0800155 .count();
156
157 int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
158
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800159 List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
160 .stream()
161 .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
162 .filter(l -> l.topic().startsWith(ELECTION_PREFIX))
163 .collect(Collectors.toList());
Jonathan Hartf2fda812015-02-17 15:21:03 -0800164
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800165 int relinquish = myPartitions.size() - myShare;
Jonathan Hartf2fda812015-02-17 15:21:03 -0800166
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800167 if (relinquish <= 0) {
168 return;
169 }
Jonathan Hartf2fda812015-02-17 15:21:03 -0800170
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800171 for (int i = 0; i < relinquish; i++) {
172 String topic = myPartitions.get(i).topic();
173 leadershipService.withdraw(topic);
Jonathan Hartf2fda812015-02-17 15:21:03 -0800174
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800175 executor.schedule(() -> recontest(topic),
176 BACKOFF_TIME, TimeUnit.SECONDS);
Jonathan Hartf2fda812015-02-17 15:21:03 -0800177 }
178 }
179
Madan Jampani4732c1b2015-05-19 17:11:50 -0700180 private void scheduleRebalance(int afterDelaySec) {
181 if (rebalanceScheduled.compareAndSet(false, true)) {
182 executor.schedule(this::doRebalance, afterDelaySec, TimeUnit.SECONDS);
183 }
184 }
185
Jonathan Hartf2fda812015-02-17 15:21:03 -0800186 /**
187 * Try and recontest for leadership of a partition.
188 *
189 * @param path topic name to recontest
190 */
191 private void recontest(String path) {
192 leadershipService.runForLeadership(path);
193 }
194
Jonathan Hart74c83132015-02-02 18:37:57 -0800195 private final class InternalLeadershipListener implements LeadershipEventListener {
196
197 @Override
198 public void event(LeadershipEvent event) {
199 Leadership leadership = event.subject();
Jonathan Hartdc9d7b82015-02-22 17:59:50 -0800200
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800201 if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
Jonathan Hart74c83132015-02-02 18:37:57 -0800202 leadership.topic().startsWith(ELECTION_PREFIX)) {
203
Jonathan Hartf2fda812015-02-17 15:21:03 -0800204 // See if we need to let some partitions go
Madan Jampani4732c1b2015-05-19 17:11:50 -0700205 scheduleRebalance(0);
Jonathan Hart74c83132015-02-02 18:37:57 -0800206 }
Jonathan Hart74c83132015-02-02 18:37:57 -0800207 }
208 }
Jonathan Hartf2fda812015-02-17 15:21:03 -0800209
210 private final class InternalClusterEventListener implements
211 ClusterEventListener {
212
213 @Override
214 public void event(ClusterEvent event) {
Madan Jampani4732c1b2015-05-19 17:11:50 -0700215 scheduleRebalance(0);
Jonathan Hartf2fda812015-02-17 15:21:03 -0800216 }
217 }
Jonathan Hart74c83132015-02-02 18:37:57 -0800218}