Refactored IntentPartitionService as WorkPartitionService
Change-Id: Ic5cf1978b7fce55b34f84eae9b03c8f9ddcfb9c1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index e914579..2eb1d2e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -32,7 +32,7 @@
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
@@ -84,7 +84,7 @@
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected IntentPartitionService partitionService;
+ protected WorkPartitionService partitionService;
private final AtomicLong sequenceNumber = new AtomicLong(0);
@@ -211,7 +211,7 @@
}
private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
- NodeId master = partitionService.getLeader(key);
+ NodeId master = partitionService.getLeader(key, Key::hash);
NodeId origin = (data != null) ? data.origin() : null;
if (data != null && (master == null || origin == null)) {
log.debug("Intent {} missing master and/or origin; master = {}, origin = {}",
@@ -283,7 +283,7 @@
@Override
public boolean isMaster(Key intentKey) {
- return partitionService.isMine(intentKey);
+ return partitionService.isMine(intentKey, Key::hash);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
similarity index 77%
rename from core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
rename to core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
index ab266b6..337bc55 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
@@ -29,10 +29,9 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
-import org.onosproject.net.intent.IntentPartitionEvent;
-import org.onosproject.net.intent.IntentPartitionEventListener;
-import org.onosproject.net.intent.IntentPartitionService;
-import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.WorkPartitionEvent;
+import org.onosproject.net.intent.WorkPartitionEventListener;
+import org.onosproject.net.intent.WorkPartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,17 +43,18 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
- * Manages the assignment of intent keyspace partitions to instances.
+ * Manages the assignment of work partitions to instances.
*/
@Component(immediate = true)
@Service
-public class IntentPartitionManager implements IntentPartitionService {
+public class WorkPartitionManager implements WorkPartitionService {
- private static final Logger log = LoggerFactory.getLogger(IntentPartitionManager.class);
+ private static final Logger log = LoggerFactory.getLogger(WorkPartitionManager.class);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@@ -72,14 +72,14 @@
private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
private static final int RETRY_AFTER_DELAY_SEC = 5;
- private static final String ELECTION_PREFIX = "intent-partition-";
+ private static final String ELECTION_PREFIX = "work-partition-";
protected NodeId localNodeId;
- private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
+ private ListenerRegistry<WorkPartitionEvent, WorkPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ScheduledExecutorService executor = Executors
- .newScheduledThreadPool(1, groupedThreads("IntentPartition", "balancer-%d", log));
+ .newScheduledThreadPool(1, groupedThreads("work-parition", "balancer-%d", log));
@Activate
public void activate() {
@@ -87,7 +87,7 @@
leadershipService.addListener(leaderListener);
listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
+ eventDispatcher.addSink(WorkPartitionEvent.class, listenerRegistry);
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
@@ -103,7 +103,7 @@
public void deactivate() {
executor.shutdownNow();
- eventDispatcher.removeSink(IntentPartitionEvent.class);
+ eventDispatcher.removeSink(WorkPartitionEvent.class);
leadershipService.removeListener(leaderListener);
log.info("Stopped");
}
@@ -112,9 +112,9 @@
* Sets the specified executor to be used for scheduling background tasks.
*
* @param executor scheduled executor service for background tasks
- * @return this PartitionManager
+ * @return this WorkPartitionManager
*/
- IntentPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
+ WorkPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
}
@@ -123,38 +123,25 @@
return ELECTION_PREFIX + i;
}
- private String getPartitionPath(PartitionId id) {
- return getPartitionPath(id.value());
- }
-
- private PartitionId getPartitionForKey(Key intentKey) {
- int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
- //TODO investigate Guava consistent hash method
- // ... does it add significant computational complexity? is it worth it?
- //int partition = consistentHash(intentKey.hash(), NUM_PARTITIONS);
- PartitionId id = new PartitionId(partition);
- return id;
+ @Override
+ public <K> boolean isMine(K id, Function<K, Long> hasher) {
+ return Objects.equals(localNodeId, getLeader(id, hasher));
}
@Override
- public boolean isMine(Key intentKey) {
- return Objects.equals(leadershipService.getLeadership(getPartitionPath(getPartitionForKey(intentKey)))
- .leaderNodeId(),
- localNodeId);
+ public <K> NodeId getLeader(K id, Function<K, Long> hasher) {
+ int partition = Math.abs(hasher.apply(id).intValue()) % NUM_PARTITIONS;
+ PartitionId partitionId = new PartitionId(partition);
+ return leadershipService.getLeadership(getPartitionPath(partitionId.value())).leaderNodeId();
}
@Override
- public NodeId getLeader(Key intentKey) {
- return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
- }
-
- @Override
- public void addListener(IntentPartitionEventListener listener) {
+ public void addListener(WorkPartitionEventListener listener) {
listenerRegistry.addListener(listener);
}
@Override
- public void removeListener(IntentPartitionEventListener listener) {
+ public void removeListener(WorkPartitionEventListener listener) {
listenerRegistry.removeListener(listener);
}
@@ -235,7 +222,7 @@
if (Objects.equals(leadership.leaderNodeId(), localNodeId) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
- eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
+ eventDispatcher.post(new WorkPartitionEvent(WorkPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}