Added couple of methods to LeadershipService.
Change-Id: I259b1a282a51af9425e941a720336f89d66f1097
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 75cf90f..65ec687 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -16,6 +16,7 @@
package org.onosproject.cluster;
import java.util.Map;
+import java.util.Set;
/**
* Service for leader election.
@@ -27,13 +28,27 @@
public interface LeadershipService {
/**
- * Gets the most recent leader for the topic.
+ * Returns the current leader for the topic.
* @param path topic
* @return nodeId of the leader, null if so such topic exists.
*/
NodeId getLeader(String path);
/**
+ * Returns the current leadership info for the topic.
+ * @param path topic
+ * @return leadership info or null if so such topic exists.
+ */
+ Leadership getLeadership(String path);
+
+ /**
+ * Returns the set of topics owned by the specified node.
+ * @param nodeId node Id.
+ * @return set of topics for which this node is the current leader.
+ */
+ Set<String> ownedTopics(NodeId nodeId);
+
+ /**
* Joins the leadership contest.
* @param path topic for which this controller node wishes to be a leader.
*/
@@ -45,6 +60,10 @@
*/
void withdraw(String path);
+ /**
+ * Returns the current leader board.
+ * @return mapping from topic to leadership info.
+ */
Map<String, Leadership> getLeaderBoard();
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index a276367..ef651cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -46,10 +46,12 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
@@ -162,6 +164,28 @@
}
@Override
+ public Leadership getLeadership(String path) {
+ checkArgument(path != null);
+ Topic topic = topics.get(path);
+ if (topic != null) {
+ return new Leadership(topic.topicName(),
+ topic.leader(),
+ topic.term());
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> ownedTopics(NodeId nodeId) {
+ checkArgument(nodeId != null);
+ return topics.values()
+ .stream()
+ .filter(topic -> nodeId.equals(topic.leader()))
+ .map(topic -> topic.topicName)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public void runForLeadership(String path) {
checkArgument(path != null);
Topic topic = new Topic(path);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
deleted file mode 100644
index 2e414d4..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.service.Lock;
-import org.onosproject.store.service.LockService;
-import org.slf4j.Logger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Distributed implementation of LeadershipService that is based on the primitives exposed by
- * LockService.
- */
-@Component(enabled = false)
-@Service
-public class LeadershipManager implements LeadershipService {
-
- private final Logger log = getLogger(getClass());
-
- private static final int TERM_DURATION_MS = 2000;
-
- // Time to wait before retrying leadership after
- // a unexpected error.
- private static final int WAIT_BEFORE_RETRY_MS = 2000;
-
- // TODO: Make Thread pool size configurable.
- private final ScheduledExecutorService threadPool =
- Executors.newScheduledThreadPool(25, groupedThreads("onos/leadership", "manager-%d"));
-
- private static final MessageSubject LEADERSHIP_UPDATES =
- new MessageSubject("leadership-contest-updates");
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private LockService lockService;
-
- private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
-
- private final Map<String, Lock> openContests = Maps.newConcurrentMap();
- private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
- private NodeId localNodeId;
-
- private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
- private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
-
- private ExecutorService messageHandlingExecutor;
-
- public static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .build()
- .populate(1);
- }
- };
-
- @Activate
- public void activate() {
- localNodeId = clusterService.getLocalNode().id();
-
- addListener(peerAdvertiser);
- addListener(leaderBoardUpdater);
-
- messageHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/leadership",
- "peer-advertisement-handler"));
-
- clusterCommunicator.addSubscriber(
- LEADERSHIP_UPDATES,
- new PeerAdvertisementHandler(),
- messageHandlingExecutor);
-
- log.info("Started.");
- }
-
- @Deactivate
- public void deactivate() {
- removeListener(peerAdvertiser);
- removeListener(leaderBoardUpdater);
-
- clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
-
- messageHandlingExecutor.shutdown();
- threadPool.shutdown();
-
- log.info("Stopped.");
- }
-
-
- @Override
- public NodeId getLeader(String path) {
- synchronized (leaderBoard) {
- Leadership leadership = leaderBoard.get(path);
- if (leadership != null) {
- return leadership.leader();
- }
- }
- return null;
- }
-
- @Override
- public void runForLeadership(String path) {
- checkArgument(path != null);
-
- if (openContests.containsKey(path)) {
- log.info("Already in the leadership contest for {}", path);
- return;
- } else {
- Lock lock = lockService.create(path);
- openContests.put(path, lock);
- threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- public void withdraw(String path) {
- checkArgument(path != null);
-
- Lock lock = openContests.remove(path);
-
- if (lock != null && lock.isLocked()) {
- lock.unlock();
- notifyListeners(
- new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(lock.path(), localNodeId, lock.epoch())));
- }
- }
-
- @Override
- public Map<String, Leadership> getLeaderBoard() {
- return ImmutableMap.copyOf(leaderBoard);
- }
-
- @Override
- public void addListener(LeadershipEventListener listener) {
- checkArgument(listener != null);
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(LeadershipEventListener listener) {
- checkArgument(listener != null);
- listeners.remove(listener);
- }
-
- private void notifyListeners(LeadershipEvent event) {
- for (LeadershipEventListener listener : listeners) {
- try {
- listener.event(event);
- } catch (Exception e) {
- log.error("Notifying listener failed with exception.", e);
- }
- }
- }
-
- private void tryAcquireLeadership(String path) {
- Lock lock = openContests.get(path);
- if (lock == null) {
- // withdrew from race.
- return;
- }
- lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
- if (error == null) {
- threadPool.schedule(
- new ReelectionTask(lock),
- TERM_DURATION_MS / 2,
- TimeUnit.MILLISECONDS);
- notifyListeners(
- new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(lock.path(), localNodeId, lock.epoch())));
- return;
- } else {
- log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
- threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
- }
- });
- }
-
- private class ReelectionTask implements Runnable {
-
- private final Lock lock;
-
- public ReelectionTask(Lock lock) {
- this.lock = lock;
- }
-
- @Override
- public void run() {
- if (!openContests.containsKey(lock.path())) {
- log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
- return;
- }
-
- boolean lockExtended = false;
- try {
- lockExtended = lock.extendExpiration(TERM_DURATION_MS);
- } catch (Exception e) {
- log.warn("Attempt to extend lock failed with an exception.", e);
- }
-
- if (lockExtended) {
- notifyListeners(
- new LeadershipEvent(
- LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(lock.path(), localNodeId, lock.epoch())));
- threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
- } else {
- // Check if this node already withdrew from the contest, in which case
- // we don't need to notify here.
- if (openContests.containsKey(lock.path())) {
- notifyListeners(
- new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(lock.path(), localNodeId, lock.epoch())));
- // Retry leadership after a brief wait.
- threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- private class TryLeadership implements Runnable {
- private final Lock lock;
-
- public TryLeadership(Lock lock) {
- this.lock = lock;
- }
-
- @Override
- public void run() {
- tryAcquireLeadership(lock.path());
- }
- }
-
- private class PeerAdvertiser implements LeadershipEventListener {
- @Override
- public void event(LeadershipEvent event) {
- // publish events originating on this host.
- if (event.subject().leader().equals(localNodeId)) {
- clusterCommunicator.broadcast(
- new ClusterMessage(
- localNodeId,
- LEADERSHIP_UPDATES,
- SERIALIZER.encode(event)));
- }
- }
- }
-
- private class PeerAdvertisementHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- LeadershipEvent event = SERIALIZER.decode(message.payload());
- log.trace("Received {} from {}", event, message.sender());
- notifyListeners(event);
- }
- }
-
- private class LeaderBoardUpdater implements LeadershipEventListener {
- @Override
- public void event(LeadershipEvent event) {
- Leadership leadershipUpdate = event.subject();
- synchronized (leaderBoard) {
- Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
- switch (event.type()) {
- case LEADER_ELECTED:
- case LEADER_REELECTED:
- if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
- }
- break;
- case LEADER_BOOTED:
- if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
- leaderBoard.remove(leadershipUpdate.topic());
- }
- break;
- default:
- break;
- }
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 69ea1f7..787b372 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -2,6 +2,7 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Map.Entry;
@@ -10,6 +11,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -171,6 +173,22 @@
}
@Override
+ public Leadership getLeadership(String path) {
+ checkArgument(path != null);
+ return leaderBoard.get(path);
+ }
+
+ @Override
+ public Set<String> ownedTopics(NodeId nodeId) {
+ checkArgument(nodeId != null);
+ return leaderBoard.entrySet()
+ .stream()
+ .filter(entry -> nodeId.equals(entry.getValue().leader()))
+ .map(Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public void runForLeadership(String path) {
log.info("Running for leadership for topic: {}", path);
activeTopics.add(path);
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 74349b3..d61772e 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -15,10 +15,14 @@
*/
package org.onosproject.store.trivial.impl;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
@@ -55,6 +59,22 @@
}
@Override
+ public Leadership getLeadership(String path) {
+ checkArgument(path != null);
+ return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0) : null;
+ }
+
+ @Override
+ public Set<String> ownedTopics(NodeId nodeId) {
+ checkArgument(nodeId != null);
+ return elections.entrySet()
+ .stream()
+ .filter(Entry::getValue)
+ .map(Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public void runForLeadership(String path) {
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
@@ -88,5 +108,4 @@
public void removeListener(LeadershipEventListener listener) {
listeners.remove(listener);
}
-
}