Added PartitionedAsyncLeaderElector that federates leader election contents across a collection of AsyncLeaderElectors
Change-Id: I6ae220d4e4d2ed8ae1cd9060482f66f418ae0551
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 99d0897..7be143f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -34,6 +34,7 @@
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
+import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -93,7 +94,15 @@
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
- return getCreator(name).newAsyncLeaderElector(name);
+ checkNotNull(name);
+ Map<PartitionId, AsyncLeaderElector> leaderElectors =
+ Maps.transformValues(members,
+ partition -> partition.newAsyncLeaderElector(name));
+ Hasher<String> hasher = topic -> {
+ long hashCode = HashCode.fromBytes(topic.getBytes(Charsets.UTF_8)).asLong();
+ return sortedMemberPartitionIds.get(Hashing.consistentHash(hashCode, members.size()));
+ };
+ return new PartitionedAsyncLeaderElector(name, leaderElectors, hasher);
}
@Override