Refactored IntentPartitionService as WorkPartitionService
Change-Id: Ic5cf1978b7fce55b34f84eae9b03c8f9ddcfb9c1
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 104d9c7..9b758d4 100644
--- a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -51,7 +51,7 @@
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -139,7 +139,7 @@
protected MastershipService mastershipService;
@Reference(cardinality = MANDATORY_UNARY)
- protected IntentPartitionService partitionService;
+ protected WorkPartitionService partitionService;
@Reference(cardinality = MANDATORY_UNARY)
protected ComponentConfigService configService;
@@ -371,7 +371,7 @@
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Key key = Key.of(keyPrefix + k, appId);
- NodeId leader = partitionService.getLeader(key);
+ NodeId leader = partitionService.getLeader(key, Key::hash);
if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
// Bail if we are not sending to this node or we have enough for this node
continue;
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionService.java b/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionService.java
deleted file mode 100644
index 4933600..0000000
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionService.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.net.intent;
-
-import com.google.common.annotations.Beta;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.ListenerService;
-
-/**
- * Service for interacting with the intent partition-to-instance assignments.
- */
-@Beta
-public interface IntentPartitionService
- extends ListenerService<IntentPartitionEvent, IntentPartitionEventListener> {
-
- /**
- * Returns whether the given intent key is in a partition owned by this
- * instance or not.
- *
- * @param intentKey intent key to query
- * @return true if the key is owned by this instance, otherwise false
- */
- boolean isMine(Key intentKey);
-
- /**
- * Returns the leader for a particular key.
- *
- * @param intentKey intent key to query
- * @return the leader node
- */
- NodeId getLeader(Key intentKey);
-
- // TODO add API for rebalancing partitions
-
-}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionEvent.java b/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionEvent.java
similarity index 85%
rename from core/api/src/main/java/org/onosproject/net/intent/IntentPartitionEvent.java
rename to core/api/src/main/java/org/onosproject/net/intent/WorkPartitionEvent.java
index 018724e..7e6571c 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionEvent.java
@@ -23,13 +23,13 @@
*/
//TODO change String into a proper object type
@Beta
-public class IntentPartitionEvent extends AbstractEvent<IntentPartitionEvent.Type, String> {
+public class WorkPartitionEvent extends AbstractEvent<WorkPartitionEvent.Type, String> {
public enum Type {
LEADER_CHANGED
}
- public IntentPartitionEvent(Type type, String partition) {
+ public WorkPartitionEvent(Type type, String partition) {
super(type, partition);
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionEventListener.java b/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionEventListener.java
similarity index 89%
rename from core/api/src/main/java/org/onosproject/net/intent/IntentPartitionEventListener.java
rename to core/api/src/main/java/org/onosproject/net/intent/WorkPartitionEventListener.java
index 2a8701d..7dae192 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentPartitionEventListener.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionEventListener.java
@@ -22,5 +22,5 @@
* Entity capable of receiving device partition-related events.
*/
@Beta
-public interface IntentPartitionEventListener extends EventListener<IntentPartitionEvent> {
+public interface WorkPartitionEventListener extends EventListener<WorkPartitionEvent> {
}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionService.java b/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionService.java
new file mode 100644
index 0000000..354bca4
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/intent/WorkPartitionService.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent;
+
+import java.util.function.Function;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.ListenerService;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Service for partitioning work, represented via a unique identifier, onto cluster nodes.
+ */
+@Beta
+public interface WorkPartitionService
+ extends ListenerService<WorkPartitionEvent, WorkPartitionEventListener> {
+
+ /**
+ * Returns whether a given id maps to a partition owned by this
+ * instance.
+ *
+ * @param id id
+ * @param hasher function that maps id to a long value
+ * @return {@code true} if the id maps to a partition owned by this instance, otherwise {@code false}
+ */
+ <K> boolean isMine(K id, Function<K, Long> hasher);
+
+ /**
+ * Returns the owner for a given id.
+ *
+ * @param id id to query
+ * @param hasher function that maps id to a long value
+ * @return the leader node identifier
+ */
+ <K> NodeId getLeader(K id, Function<K, Long> hasher);
+}
diff --git a/core/api/src/test/java/org/onosproject/net/intent/IntentPartitionServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/intent/WorkPartitionServiceAdapter.java
similarity index 66%
rename from core/api/src/test/java/org/onosproject/net/intent/IntentPartitionServiceAdapter.java
rename to core/api/src/test/java/org/onosproject/net/intent/WorkPartitionServiceAdapter.java
index 3cf8e5f..31c65ab 100644
--- a/core/api/src/test/java/org/onosproject/net/intent/IntentPartitionServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/net/intent/WorkPartitionServiceAdapter.java
@@ -15,29 +15,31 @@
*/
package org.onosproject.net.intent;
+import java.util.function.Function;
+
import org.onosproject.cluster.NodeId;
/**
- * Testing adapter for the IntentPartitionService.
+ * Testing adapter for the WorkPartitionService.
*/
-public class IntentPartitionServiceAdapter implements IntentPartitionService {
+public class WorkPartitionServiceAdapter implements WorkPartitionService {
@Override
- public boolean isMine(Key intentKey) {
+ public <K> boolean isMine(K id, Function<K, Long> hasher) {
return true;
}
@Override
- public NodeId getLeader(Key intentKey) {
+ public <K> NodeId getLeader(K id, Function<K, Long> hasher) {
return null;
}
@Override
- public void addListener(IntentPartitionEventListener listener) {
+ public void addListener(WorkPartitionEventListener listener) {
}
@Override
- public void removeListener(IntentPartitionEventListener listener) {
+ public void removeListener(WorkPartitionEventListener listener) {
}
}
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java
index d04e4e2..ea65b54 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleClusterStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.trivial;
import com.google.common.collect.ImmutableSet;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -32,14 +33,14 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
-import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.IntentPartitionEvent;
-import org.onosproject.net.intent.IntentPartitionEventListener;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionEvent;
+import org.onosproject.net.intent.WorkPartitionEventListener;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
import java.util.Set;
+import java.util.function.Function;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.*;
@@ -53,7 +54,7 @@
@Service
public class SimpleClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore, IntentPartitionService {
+ implements ClusterStore, WorkPartitionService {
public static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
@@ -66,7 +67,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
- private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
+ private ListenerRegistry<WorkPartitionEvent, WorkPartitionEventListener> listenerRegistry;
private boolean started = false;
@Activate
@@ -74,14 +75,14 @@
instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
+ eventDispatcher.addSink(WorkPartitionEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
- eventDispatcher.removeSink(IntentPartitionEvent.class);
+ eventDispatcher.removeSink(WorkPartitionEvent.class);
log.info("Stopped");
}
@@ -126,25 +127,25 @@
}
@Override
- public boolean isMine(Key intentKey) {
+ public <K> boolean isMine(K key, Function<K, Long> hasher) {
checkPermission(INTENT_READ);
return true;
}
@Override
- public NodeId getLeader(Key intentKey) {
+ public <K> NodeId getLeader(K key, Function<K, Long> hasher) {
checkPermission(INTENT_READ);
return instance.id();
}
@Override
- public void addListener(IntentPartitionEventListener listener) {
+ public void addListener(WorkPartitionEventListener listener) {
checkPermission(INTENT_EVENT);
listenerRegistry.addListener(listener);
}
@Override
- public void removeListener(IntentPartitionEventListener listener) {
+ public void removeListener(WorkPartitionEventListener listener) {
checkPermission(INTENT_EVENT);
listenerRegistry.removeListener(listener);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
index 24abda4..b11bb0e 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
@@ -43,9 +43,9 @@
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.IntentPartitionEvent;
-import org.onosproject.net.intent.IntentPartitionEventListener;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionEvent;
+import org.onosproject.net.intent.WorkPartitionEventListener;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.resource.ResourceEvent;
import org.onosproject.net.resource.ResourceListener;
@@ -113,7 +113,7 @@
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected IntentPartitionService partitionService;
+ protected WorkPartitionService partitionService;
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
@@ -124,7 +124,7 @@
private ResourceListener resourceListener = new InternalResourceListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
- private IntentPartitionEventListener partitionListener = new InternalPartitionListener();
+ private WorkPartitionEventListener partitionListener = new InternalPartitionListener();
private TopologyChangeDelegate delegate;
protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
@@ -417,9 +417,9 @@
}
}
- private final class InternalPartitionListener implements IntentPartitionEventListener {
+ private final class InternalPartitionListener implements WorkPartitionEventListener {
@Override
- public void event(IntentPartitionEvent event) {
+ public void event(WorkPartitionEvent event) {
log.debug("got message {}", event.subject());
scheduleIntentUpdate(1);
}
diff --git a/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java b/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java
index 62a7e1d..92b4eec 100644
--- a/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java
+++ b/core/security/src/main/java/org/onosproject/security/impl/DefaultPolicyBuilder.java
@@ -59,7 +59,7 @@
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentExtensionService;
import org.onosproject.net.intent.IntentClockService;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.link.LinkAdminService;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.PacketService;
@@ -227,7 +227,7 @@
permSet.add(new ServicePermission(IntentService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(IntentClockService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(IntentExtensionService.class.getName(), ServicePermission.GET));
- permSet.add(new ServicePermission(IntentPartitionService.class.getName(), ServicePermission.GET));
+ permSet.add(new ServicePermission(WorkPartitionService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(DeviceKeyService.class.getName(), ServicePermission.GET));
permSet.add(new ServicePermission(LinkService.class.getName(), ServicePermission.GET));
// permSet.add(new ServicePermission(MulticastRouteService.class.getName(), ServicePermission.GET));
@@ -314,12 +314,12 @@
serviceDirectory.put(HOST_EVENT, ImmutableSet.of(
HostService.class.getName()));
serviceDirectory.put(INTENT_READ, ImmutableSet.of(
- IntentService.class.getName(), IntentPartitionService.class.getName(),
+ IntentService.class.getName(), WorkPartitionService.class.getName(),
IntentClockService.class.getName(), IntentExtensionService.class.getName()));
serviceDirectory.put(INTENT_WRITE, ImmutableSet.of(
IntentService.class.getName(), IntentExtensionService.class.getName()));
serviceDirectory.put(INTENT_EVENT, ImmutableSet.of(
- IntentService.class.getName(), IntentPartitionService.class.getName()));
+ IntentService.class.getName(), WorkPartitionService.class.getName()));
// serviceDirectory.put(LINK_READ, ImmutableSet.of(
// LinkService.class.getName(), LinkResourceService.class.getName(),
// LabelResourceService.class.getName()));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index e914579..2eb1d2e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -32,7 +32,7 @@
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
@@ -84,7 +84,7 @@
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected IntentPartitionService partitionService;
+ protected WorkPartitionService partitionService;
private final AtomicLong sequenceNumber = new AtomicLong(0);
@@ -211,7 +211,7 @@
}
private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
- NodeId master = partitionService.getLeader(key);
+ NodeId master = partitionService.getLeader(key, Key::hash);
NodeId origin = (data != null) ? data.origin() : null;
if (data != null && (master == null || origin == null)) {
log.debug("Intent {} missing master and/or origin; master = {}, origin = {}",
@@ -283,7 +283,7 @@
@Override
public boolean isMaster(Key intentKey) {
- return partitionService.isMine(intentKey);
+ return partitionService.isMine(intentKey, Key::hash);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
similarity index 77%
rename from core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
rename to core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
index ab266b6..337bc55 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/WorkPartitionManager.java
@@ -29,10 +29,9 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
-import org.onosproject.net.intent.IntentPartitionEvent;
-import org.onosproject.net.intent.IntentPartitionEventListener;
-import org.onosproject.net.intent.IntentPartitionService;
-import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.WorkPartitionEvent;
+import org.onosproject.net.intent.WorkPartitionEventListener;
+import org.onosproject.net.intent.WorkPartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,17 +43,18 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
- * Manages the assignment of intent keyspace partitions to instances.
+ * Manages the assignment of work partitions to instances.
*/
@Component(immediate = true)
@Service
-public class IntentPartitionManager implements IntentPartitionService {
+public class WorkPartitionManager implements WorkPartitionService {
- private static final Logger log = LoggerFactory.getLogger(IntentPartitionManager.class);
+ private static final Logger log = LoggerFactory.getLogger(WorkPartitionManager.class);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@@ -72,14 +72,14 @@
private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10;
private static final int RETRY_AFTER_DELAY_SEC = 5;
- private static final String ELECTION_PREFIX = "intent-partition-";
+ private static final String ELECTION_PREFIX = "work-partition-";
protected NodeId localNodeId;
- private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
+ private ListenerRegistry<WorkPartitionEvent, WorkPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ScheduledExecutorService executor = Executors
- .newScheduledThreadPool(1, groupedThreads("IntentPartition", "balancer-%d", log));
+ .newScheduledThreadPool(1, groupedThreads("work-parition", "balancer-%d", log));
@Activate
public void activate() {
@@ -87,7 +87,7 @@
leadershipService.addListener(leaderListener);
listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
+ eventDispatcher.addSink(WorkPartitionEvent.class, listenerRegistry);
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
@@ -103,7 +103,7 @@
public void deactivate() {
executor.shutdownNow();
- eventDispatcher.removeSink(IntentPartitionEvent.class);
+ eventDispatcher.removeSink(WorkPartitionEvent.class);
leadershipService.removeListener(leaderListener);
log.info("Stopped");
}
@@ -112,9 +112,9 @@
* Sets the specified executor to be used for scheduling background tasks.
*
* @param executor scheduled executor service for background tasks
- * @return this PartitionManager
+ * @return this WorkPartitionManager
*/
- IntentPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
+ WorkPartitionManager withScheduledExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
}
@@ -123,38 +123,25 @@
return ELECTION_PREFIX + i;
}
- private String getPartitionPath(PartitionId id) {
- return getPartitionPath(id.value());
- }
-
- private PartitionId getPartitionForKey(Key intentKey) {
- int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
- //TODO investigate Guava consistent hash method
- // ... does it add significant computational complexity? is it worth it?
- //int partition = consistentHash(intentKey.hash(), NUM_PARTITIONS);
- PartitionId id = new PartitionId(partition);
- return id;
+ @Override
+ public <K> boolean isMine(K id, Function<K, Long> hasher) {
+ return Objects.equals(localNodeId, getLeader(id, hasher));
}
@Override
- public boolean isMine(Key intentKey) {
- return Objects.equals(leadershipService.getLeadership(getPartitionPath(getPartitionForKey(intentKey)))
- .leaderNodeId(),
- localNodeId);
+ public <K> NodeId getLeader(K id, Function<K, Long> hasher) {
+ int partition = Math.abs(hasher.apply(id).intValue()) % NUM_PARTITIONS;
+ PartitionId partitionId = new PartitionId(partition);
+ return leadershipService.getLeadership(getPartitionPath(partitionId.value())).leaderNodeId();
}
@Override
- public NodeId getLeader(Key intentKey) {
- return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
- }
-
- @Override
- public void addListener(IntentPartitionEventListener listener) {
+ public void addListener(WorkPartitionEventListener listener) {
listenerRegistry.addListener(listener);
}
@Override
- public void removeListener(IntentPartitionEventListener listener) {
+ public void removeListener(WorkPartitionEventListener listener) {
listenerRegistry.removeListener(listener);
}
@@ -235,7 +222,7 @@
if (Objects.equals(leadership.leaderNodeId(), localNodeId) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
- eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
+ eventDispatcher.post(new WorkPartitionEvent(WorkPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/GossipIntentStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/GossipIntentStoreTest.java
index 5d1259e..e1a4a4f 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/GossipIntentStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/GossipIntentStoreTest.java
@@ -30,7 +30,7 @@
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.net.intent.MockIdGenerator;
-import org.onosproject.net.intent.IntentPartitionServiceAdapter;
+import org.onosproject.net.intent.WorkPartitionServiceAdapter;
import org.onosproject.store.service.TestStorageService;
import static org.hamcrest.Matchers.is;
@@ -52,7 +52,7 @@
public void setUp() {
intentStore = new GossipIntentStore();
intentStore.storageService = new TestStorageService();
- intentStore.partitionService = new IntentPartitionServiceAdapter();
+ intentStore.partitionService = new WorkPartitionServiceAdapter();
intentStore.clusterService = new ClusterServiceAdapter();
idGenerator = new MockIdGenerator();
Intent.bindIdGenerator(idGenerator);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/WorkPartitionManagerTest.java
similarity index 90%
rename from core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
rename to core/store/dist/src/test/java/org/onosproject/store/intent/impl/WorkPartitionManagerTest.java
index fb44abd..17651f1 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/WorkPartitionManagerTest.java
@@ -50,9 +50,9 @@
import static org.junit.Assert.assertTrue;
/**
- * Unit tests for the IntentPartitionManager class.
+ * Unit tests for the WorkPartitionManager class.
*/
-public class IntentPartitionManagerTest {
+public class WorkPartitionManagerTest {
private final LeadershipEvent event
= new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
@@ -64,12 +64,12 @@
private static final NodeId OTHER_NODE_ID = new NodeId("other");
private static final NodeId INACTIVE_NODE_ID = new NodeId("inactive");
- private static final String ELECTION_PREFIX = "intent-partition-";
+ private static final String ELECTION_PREFIX = "work-partition-";
private LeadershipService leadershipService;
private LeadershipEventListener leaderListener;
- private IntentPartitionManager partitionManager;
+ private WorkPartitionManager partitionManager;
@Before
public void setUp() {
@@ -77,13 +77,13 @@
leadershipService.addListener(anyObject(LeadershipEventListener.class));
expectLastCall().andDelegateTo(new TestLeadershipService());
- for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
+ for (int i = 0; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
.andReturn(null)
.times(1);
}
- partitionManager = new IntentPartitionManager()
+ partitionManager = new WorkPartitionManager()
.withScheduledExecutor(new NullScheduledExecutor());
partitionManager.clusterService = new TestClusterService();
@@ -109,14 +109,14 @@
.anyTimes();
}
- for (int i = numMine; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
+ for (int i = numMine; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.getLeadership(ELECTION_PREFIX + i))
.andReturn(new Leadership(ELECTION_PREFIX + i,
new Leader(OTHER_NODE_ID, 1, 1000),
allNodes))
.anyTimes();
}
- for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
+ for (int i = 0; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.getCandidates(ELECTION_PREFIX + i))
.andReturn(Arrays.asList(MY_NODE_ID, OTHER_NODE_ID))
.anyTimes();
@@ -133,7 +133,7 @@
leadershipService.addListener(anyObject(LeadershipEventListener.class));
- for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
+ for (int i = 0; i < WorkPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
.andReturn(null)
.times(1);
@@ -159,20 +159,20 @@
Key myKey = new ControllableHashKey(0);
Key notMyKey = new ControllableHashKey(1);
- assertTrue(partitionManager.isMine(myKey));
- assertFalse(partitionManager.isMine(notMyKey));
+ assertTrue(partitionManager.isMine(myKey, Key::hash));
+ assertFalse(partitionManager.isMine(notMyKey, Key::hash));
// Make us the owner of 4 partitions now
reset(leadershipService);
setUpLeadershipService(4);
replay(leadershipService);
- assertTrue(partitionManager.isMine(myKey));
+ assertTrue(partitionManager.isMine(myKey, Key::hash));
// notMyKey is now my key because because we're in control of that
// partition now
- assertTrue(partitionManager.isMine(notMyKey));
+ assertTrue(partitionManager.isMine(notMyKey, Key::hash));
- assertFalse(partitionManager.isMine(new ControllableHashKey(4)));
+ assertFalse(partitionManager.isMine(new ControllableHashKey(4), Key::hash));
}
/**
@@ -183,7 +183,7 @@
@Test
public void testRebalanceScheduling() {
// We have all the partitions so we'll need to relinquish some
- setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
+ setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS);
replay(leadershipService);
@@ -202,7 +202,7 @@
@Test
public void testRebalance() {
// We have all the partitions so we'll need to relinquish some
- setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
+ setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS);
leadershipService.withdraw(anyString());
expectLastCall().times(7);
@@ -224,7 +224,7 @@
@Test
public void testNoRebalance() {
// Partitions are already perfectly balanced among the two active instances
- setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS / 2);
+ setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS / 2);
replay(leadershipService);
partitionManager.activate();
@@ -236,7 +236,7 @@
reset(leadershipService);
// We have a smaller share than we should
- setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS / 2 - 1);
+ setUpLeadershipService(WorkPartitionManager.NUM_PARTITIONS / 2 - 1);
replay(leadershipService);
// trigger rebalance
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentService.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentService.java
index 753b2e3..cebc4a4 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentService.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentService.java
@@ -32,7 +32,7 @@
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
-import org.onosproject.net.intent.IntentPartitionService;
+import org.onosproject.net.intent.WorkPartitionService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.Key;
@@ -64,7 +64,7 @@
protected IntentService intentService;
protected VirtualNetworkStore store;
- protected IntentPartitionService partitionService;
+ protected WorkPartitionService partitionService;
private final VirtualNetwork network;
private final VirtualNetworkService manager;
@@ -83,7 +83,7 @@
this.manager = virtualNetworkManager;
this.store = serviceDirectory.get(VirtualNetworkStore.class);
this.intentService = serviceDirectory.get(IntentService.class);
- this.partitionService = serviceDirectory.get(IntentPartitionService.class);
+ this.partitionService = serviceDirectory.get(WorkPartitionService.class);
}
@Override
@@ -225,7 +225,7 @@
checkNotNull(intentKey, INTENT_KEY_NULL);
Intent intent = getIntent(intentKey);
checkNotNull(intent, INTENT_NULL);
- return partitionService.isMine(intentKey);
+ return partitionService.isMine(intentKey, Key::hash);
}
@Override
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentServiceTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentServiceTest.java
index 5e2d06c..9f5eb61 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentServiceTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkIntentServiceTest.java
@@ -54,8 +54,8 @@
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentExtensionService;
import org.onosproject.net.intent.IntentListener;
-import org.onosproject.net.intent.IntentPartitionService;
-import org.onosproject.net.intent.IntentPartitionServiceAdapter;
+import org.onosproject.net.intent.WorkPartitionService;
+import org.onosproject.net.intent.WorkPartitionServiceAdapter;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentTestsMocks;
@@ -103,7 +103,7 @@
private VirtualNetworkIntentService vnetIntentService;
private TestIntentCompiler compiler = new TestIntentCompiler();
private IntentExtensionService intentExtensionService;
- private IntentPartitionService intentPartitionService;
+ private WorkPartitionService workPartitionService;
private ServiceDirectory testDirectory;
private TestListener listener = new TestListener();
private IdGenerator idGenerator = new MockIdGenerator();
@@ -142,11 +142,11 @@
withdrawn = new Semaphore(0, true);
purged = new Semaphore(0, true);
- intentPartitionService = new IntentPartitionServiceAdapter();
+ workPartitionService = new WorkPartitionServiceAdapter();
testDirectory = new TestServiceDirectory()
.add(VirtualNetworkStore.class, virtualNetworkManagerStore)
.add(IntentService.class, intentService)
- .add(IntentPartitionService.class, intentPartitionService);
+ .add(WorkPartitionService.class, workPartitionService);
BaseResource.setServiceDirectory(testDirectory);
}
@@ -215,7 +215,7 @@
vnetIntentService = new VirtualNetworkIntentService(manager, virtualNetwork, testDirectory);
vnetIntentService.intentService = intentService;
vnetIntentService.store = virtualNetworkManagerStore;
- vnetIntentService.partitionService = intentPartitionService;
+ vnetIntentService.partitionService = workPartitionService;
return virtualNetwork;
}